mirror of https://github.com/grpc/grpc-java.git
xds: xds load report store implementation (#5587)
* Implemented XdsCliendLoadRecorder which is a ClientStreamTracer.Factory that takes a counter and produces ClientStreamTracer aggregating the counter in callback. * WIP: add tests for XdsLoadReportStore * fix query count logic, use an atomic in-progress call counter instead of callsStarted with manual computation at snapshot * make XdsLoadReportStore threadsafe * fix class and field modifiers * make iterating concurrentMap threadsafe * fixed forgetting to call delegated streamClosed * add a method to discard ClientLoadCounter for a given locality * add test to guard interceptPickResult does not destroy original ClientStreamTracer * added cluster wide dropCounters and method to be called to record dropped requests, tests to be added later. * add methods for add/discard ClientLoadCounters for localities manually instead of implicitly added by interceptPickResult call. Unit tested. * make a static noop ClientStreamTracer and ClientStreamTracer.Factory instead of creating one each every time need it * refractor interceptPickResult * modified ClientLoadCounter to allow continuing recording loads for localities no longer exposed by balancer while having ongoing loads * refractor tests * reworded method comment for calling in syncContext * fixed issue of no setting dropCount to 0 after load reporting * polish test * added test coverage for recording dropped requests (not concurrent) * added class comment for XdsClientLoadRecorder
This commit is contained in:
parent
7c0e14318e
commit
07f9efe95e
|
|
@ -0,0 +1,118 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2019 The gRPC Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.grpc.xds;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
import io.grpc.ClientStreamTracer;
|
||||||
|
import io.grpc.ClientStreamTracer.StreamInfo;
|
||||||
|
import io.grpc.Metadata;
|
||||||
|
import io.grpc.Status;
|
||||||
|
import io.grpc.util.ForwardingClientStreamTracer;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An {@link XdsClientLoadRecorder} instance records and aggregates client-side load data into an
|
||||||
|
* {@link ClientLoadCounter} object.
|
||||||
|
*/
|
||||||
|
@ThreadSafe
|
||||||
|
final class XdsClientLoadRecorder extends ClientStreamTracer.Factory {
|
||||||
|
|
||||||
|
private final ClientStreamTracer.Factory delegate;
|
||||||
|
private final ClientLoadCounter counter;
|
||||||
|
|
||||||
|
XdsClientLoadRecorder(ClientLoadCounter counter, ClientStreamTracer.Factory delegate) {
|
||||||
|
this.counter = checkNotNull(counter, "counter");
|
||||||
|
this.delegate = checkNotNull(delegate, "delegate");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
|
||||||
|
counter.callsInProgress.getAndIncrement();
|
||||||
|
final ClientStreamTracer delegateTracer = delegate.newClientStreamTracer(info, headers);
|
||||||
|
return new StreamTracer(delegateTracer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link ClientLoadSnapshot} represents a snapshot of {@link ClientLoadCounter} to be sent as
|
||||||
|
* part of {@link io.envoyproxy.envoy.api.v2.endpoint.ClusterStats} to the balancer.
|
||||||
|
*/
|
||||||
|
static final class ClientLoadSnapshot {
|
||||||
|
|
||||||
|
final long callsSucceed;
|
||||||
|
final long callsInProgress;
|
||||||
|
final long callsFailed;
|
||||||
|
|
||||||
|
ClientLoadSnapshot(long callsSucceed, long callsInProgress, long callsFailed) {
|
||||||
|
this.callsSucceed = callsSucceed;
|
||||||
|
this.callsInProgress = callsInProgress;
|
||||||
|
this.callsFailed = callsFailed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class ClientLoadCounter {
|
||||||
|
|
||||||
|
private final AtomicLong callsInProgress = new AtomicLong();
|
||||||
|
private final AtomicLong callsFinished = new AtomicLong();
|
||||||
|
private final AtomicLong callsFailed = new AtomicLong();
|
||||||
|
private boolean active = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate a query count snapshot and reset counts for next snapshot.
|
||||||
|
*/
|
||||||
|
ClientLoadSnapshot snapshot() {
|
||||||
|
long numFailed = callsFailed.getAndSet(0);
|
||||||
|
return new ClientLoadSnapshot(
|
||||||
|
callsFinished.getAndSet(0) - numFailed,
|
||||||
|
callsInProgress.get(),
|
||||||
|
numFailed);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isActive() {
|
||||||
|
return active;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setActive(boolean value) {
|
||||||
|
active = value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class StreamTracer extends ForwardingClientStreamTracer {
|
||||||
|
|
||||||
|
private final ClientStreamTracer delegate;
|
||||||
|
|
||||||
|
private StreamTracer(ClientStreamTracer delegate) {
|
||||||
|
this.delegate = checkNotNull(delegate, "delegate");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ClientStreamTracer delegate() {
|
||||||
|
return delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void streamClosed(Status status) {
|
||||||
|
counter.callsFinished.getAndIncrement();
|
||||||
|
counter.callsInProgress.getAndDecrement();
|
||||||
|
if (!status.isOk()) {
|
||||||
|
counter.callsFailed.getAndIncrement();
|
||||||
|
}
|
||||||
|
delegate().streamClosed(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,169 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2019 The gRPC Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.grpc.xds;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
import static com.google.common.base.Preconditions.checkState;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.protobuf.Duration;
|
||||||
|
import io.envoyproxy.envoy.api.v2.core.Locality;
|
||||||
|
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
|
||||||
|
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests;
|
||||||
|
import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats;
|
||||||
|
import io.grpc.ClientStreamTracer;
|
||||||
|
import io.grpc.ClientStreamTracer.StreamInfo;
|
||||||
|
import io.grpc.LoadBalancer.PickResult;
|
||||||
|
import io.grpc.Metadata;
|
||||||
|
import io.grpc.xds.XdsClientLoadRecorder.ClientLoadCounter;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An {@link XdsLoadReportStore} instance holds the client side load stats for a cluster.
|
||||||
|
*/
|
||||||
|
final class XdsLoadReportStore {
|
||||||
|
|
||||||
|
private static final ClientStreamTracer NOOP_CLIENT_STREAM_TRACER =
|
||||||
|
new ClientStreamTracer() {
|
||||||
|
};
|
||||||
|
private static final ClientStreamTracer.Factory NOOP_CLIENT_STREAM_TRACER_FACTORY =
|
||||||
|
new ClientStreamTracer.Factory() {
|
||||||
|
@Override
|
||||||
|
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
|
||||||
|
return NOOP_CLIENT_STREAM_TRACER;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
private final String clusterName;
|
||||||
|
private final ConcurrentMap<Locality, ClientLoadCounter> localityLoadCounters;
|
||||||
|
// Cluster level dropped request counts for each category specified in the DropOverload policy.
|
||||||
|
private final ConcurrentMap<String, AtomicLong> dropCounters;
|
||||||
|
|
||||||
|
XdsLoadReportStore(String clusterName) {
|
||||||
|
this(clusterName, new ConcurrentHashMap<Locality, ClientLoadCounter>(),
|
||||||
|
new ConcurrentHashMap<String, AtomicLong>());
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
XdsLoadReportStore(String clusterName,
|
||||||
|
ConcurrentMap<Locality, ClientLoadCounter> localityLoadCounters,
|
||||||
|
ConcurrentMap<String, AtomicLong> dropCounters) {
|
||||||
|
this.clusterName = checkNotNull(clusterName, "clusterName");
|
||||||
|
this.localityLoadCounters = checkNotNull(localityLoadCounters, "localityLoadCounters");
|
||||||
|
this.dropCounters = checkNotNull(dropCounters, "dropCounters");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a {@link ClusterStats} containing load stats in locality granularity.
|
||||||
|
* This method should be called in the same synchronized context that
|
||||||
|
* {@link XdsLoadBalancer#helper#getSynchronizationContext} returns.
|
||||||
|
*/
|
||||||
|
ClusterStats generateLoadReport(Duration interval) {
|
||||||
|
ClusterStats.Builder statsBuilder = ClusterStats.newBuilder().setClusterName(clusterName)
|
||||||
|
.setLoadReportInterval(interval);
|
||||||
|
for (Map.Entry<Locality, XdsClientLoadRecorder.ClientLoadCounter> entry : localityLoadCounters
|
||||||
|
.entrySet()) {
|
||||||
|
XdsClientLoadRecorder.ClientLoadSnapshot snapshot = entry.getValue().snapshot();
|
||||||
|
statsBuilder
|
||||||
|
.addUpstreamLocalityStats(UpstreamLocalityStats.newBuilder()
|
||||||
|
.setLocality(entry.getKey())
|
||||||
|
.setTotalSuccessfulRequests(snapshot.callsSucceed)
|
||||||
|
.setTotalErrorRequests(snapshot.callsFailed)
|
||||||
|
.setTotalRequestsInProgress(snapshot.callsInProgress));
|
||||||
|
// Discard counters for localities that are no longer exposed by the remote balancer and
|
||||||
|
// no RPCs ongoing.
|
||||||
|
if (!entry.getValue().isActive() && snapshot.callsInProgress == 0) {
|
||||||
|
localityLoadCounters.remove(entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (Map.Entry<String, AtomicLong> entry : dropCounters.entrySet()) {
|
||||||
|
statsBuilder.addDroppedRequests(DroppedRequests.newBuilder()
|
||||||
|
.setCategory(entry.getKey())
|
||||||
|
.setDroppedCount(entry.getValue().getAndSet(0)));
|
||||||
|
}
|
||||||
|
return statsBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link ClientLoadCounter} for the provided locality or make it active if already in
|
||||||
|
* this {@link XdsLoadReportStore}. This method needs to be called at locality updates only for
|
||||||
|
* newly assigned localities in balancer discovery responses.
|
||||||
|
* This method should be called in the same synchronized context that
|
||||||
|
* {@link XdsLoadBalancer#helper#getSynchronizationContext} returns.
|
||||||
|
*/
|
||||||
|
void addLocality(final Locality locality) {
|
||||||
|
ClientLoadCounter counter = localityLoadCounters.get(locality);
|
||||||
|
checkState(counter == null || !counter.isActive(),
|
||||||
|
"An active ClientLoadCounter for locality %s already exists", locality);
|
||||||
|
if (counter == null) {
|
||||||
|
localityLoadCounters.put(locality, new ClientLoadCounter());
|
||||||
|
} else {
|
||||||
|
counter.setActive(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deactivate the {@link ClientLoadCounter} for the provided locality in by this
|
||||||
|
* {@link XdsLoadReportStore}. Inactive {@link ClientLoadCounter}s are for localities
|
||||||
|
* no longer exposed by the remote balancer. This method needs to be called at
|
||||||
|
* locality updates only for localities newly removed from balancer discovery responses.
|
||||||
|
* This method should be called in the same synchronized context that
|
||||||
|
* {@link XdsLoadBalancer#helper#getSynchronizationContext} returns.
|
||||||
|
*/
|
||||||
|
void removeLocality(final Locality locality) {
|
||||||
|
ClientLoadCounter counter = localityLoadCounters.get(locality);
|
||||||
|
checkState(counter != null && counter.isActive(),
|
||||||
|
"No active ClientLoadCounter for locality %s exists", locality);
|
||||||
|
counter.setActive(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Intercepts a in-locality PickResult with load recording {@link ClientStreamTracer.Factory}.
|
||||||
|
*/
|
||||||
|
PickResult interceptPickResult(PickResult pickResult, Locality locality) {
|
||||||
|
if (!pickResult.getStatus().isOk()) {
|
||||||
|
return pickResult;
|
||||||
|
}
|
||||||
|
XdsClientLoadRecorder.ClientLoadCounter counter = localityLoadCounters.get(locality);
|
||||||
|
if (counter == null) {
|
||||||
|
return pickResult;
|
||||||
|
}
|
||||||
|
ClientStreamTracer.Factory originFactory = pickResult.getStreamTracerFactory();
|
||||||
|
if (originFactory == null) {
|
||||||
|
originFactory = NOOP_CLIENT_STREAM_TRACER_FACTORY;
|
||||||
|
}
|
||||||
|
XdsClientLoadRecorder recorder = new XdsClientLoadRecorder(counter, originFactory);
|
||||||
|
return PickResult.withSubchannel(pickResult.getSubchannel(), recorder);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record that a request has been dropped by drop overload policy with the provided category
|
||||||
|
* instructed by the remote balancer.
|
||||||
|
*/
|
||||||
|
void recordDroppedRequest(String category) {
|
||||||
|
AtomicLong counter = dropCounters.get(category);
|
||||||
|
if (counter == null) {
|
||||||
|
counter = dropCounters.putIfAbsent(category, new AtomicLong());
|
||||||
|
if (counter == null) {
|
||||||
|
counter = dropCounters.get(category);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
counter.getAndIncrement();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,308 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2019 The gRPC Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.grpc.xds;
|
||||||
|
|
||||||
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import com.google.protobuf.Duration;
|
||||||
|
import io.envoyproxy.envoy.api.v2.core.Locality;
|
||||||
|
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
|
||||||
|
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests;
|
||||||
|
import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats;
|
||||||
|
import io.grpc.Attributes;
|
||||||
|
import io.grpc.CallOptions;
|
||||||
|
import io.grpc.ClientStreamTracer;
|
||||||
|
import io.grpc.LoadBalancer.PickResult;
|
||||||
|
import io.grpc.LoadBalancer.Subchannel;
|
||||||
|
import io.grpc.Metadata;
|
||||||
|
import io.grpc.Status;
|
||||||
|
import io.grpc.xds.XdsClientLoadRecorder.ClientLoadCounter;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.MockitoAnnotations;
|
||||||
|
|
||||||
|
/** Unit tests for {@link XdsLoadReportStore}. */
|
||||||
|
public class XdsLoadReportStoreTest {
|
||||||
|
private static final String SERVICE_NAME = "api.google.com";
|
||||||
|
private static final ClientStreamTracer.StreamInfo STREAM_INFO =
|
||||||
|
new ClientStreamTracer.StreamInfo() {
|
||||||
|
@Override
|
||||||
|
public Attributes getTransportAttrs() {
|
||||||
|
return Attributes.EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CallOptions getCallOptions() {
|
||||||
|
return CallOptions.DEFAULT;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
private static final Locality TEST_LOCALITY = Locality.newBuilder()
|
||||||
|
.setRegion("test_region")
|
||||||
|
.setZone("test_zone")
|
||||||
|
.setSubZone("test_subzone")
|
||||||
|
.build();
|
||||||
|
@Mock Subchannel fakeSubchannel;
|
||||||
|
private ConcurrentMap<Locality, ClientLoadCounter> localityLoadCounters;
|
||||||
|
private ConcurrentMap<String, AtomicLong> dropCounters;
|
||||||
|
private XdsLoadReportStore loadStore;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
MockitoAnnotations.initMocks(this);
|
||||||
|
localityLoadCounters = new ConcurrentHashMap<>();
|
||||||
|
dropCounters = new ConcurrentHashMap<>();
|
||||||
|
loadStore = new XdsLoadReportStore(SERVICE_NAME, localityLoadCounters, dropCounters);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void loadNotRecordedForUntrackedLocality() {
|
||||||
|
PickResult pickResult = PickResult.withSubchannel(fakeSubchannel);
|
||||||
|
// XdsClientLoadStore does not record loads for untracked localities.
|
||||||
|
PickResult interceptedPickResult = loadStore.interceptPickResult(pickResult, TEST_LOCALITY);
|
||||||
|
assertThat(localityLoadCounters).hasSize(0);
|
||||||
|
assertThat(interceptedPickResult.getStreamTracerFactory()).isNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void invalidPickResultNotIntercepted() {
|
||||||
|
PickResult errorResult = PickResult.withError(Status.UNAVAILABLE.withDescription("Error"));
|
||||||
|
PickResult emptyResult = PickResult.withNoResult();
|
||||||
|
PickResult droppedResult = PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped"));
|
||||||
|
PickResult interceptedErrorResult = loadStore.interceptPickResult(errorResult, TEST_LOCALITY);
|
||||||
|
PickResult interceptedEmptyResult = loadStore.interceptPickResult(emptyResult, TEST_LOCALITY);
|
||||||
|
PickResult interceptedDroppedResult = loadStore
|
||||||
|
.interceptPickResult(droppedResult, TEST_LOCALITY);
|
||||||
|
assertThat(localityLoadCounters).hasSize(0);
|
||||||
|
assertThat(interceptedErrorResult.getStreamTracerFactory()).isNull();
|
||||||
|
assertThat(interceptedEmptyResult.getStreamTracerFactory()).isNull();
|
||||||
|
assertThat(interceptedDroppedResult.getStreamTracerFactory()).isNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void interceptPreservesOriginStreamTracer() {
|
||||||
|
loadStore.addLocality(TEST_LOCALITY);
|
||||||
|
ClientStreamTracer.Factory mockFactory = mock(ClientStreamTracer.Factory.class);
|
||||||
|
ClientStreamTracer mockTracer = mock(ClientStreamTracer.class);
|
||||||
|
when(mockFactory
|
||||||
|
.newClientStreamTracer(any(ClientStreamTracer.StreamInfo.class), any(Metadata.class)))
|
||||||
|
.thenReturn(mockTracer);
|
||||||
|
PickResult pickResult = PickResult.withSubchannel(fakeSubchannel, mockFactory);
|
||||||
|
PickResult interceptedPickResult = loadStore.interceptPickResult(pickResult, TEST_LOCALITY);
|
||||||
|
Metadata metadata = new Metadata();
|
||||||
|
interceptedPickResult.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, metadata)
|
||||||
|
.streamClosed(Status.OK);
|
||||||
|
ArgumentCaptor<ClientStreamTracer.StreamInfo> streamInfoArgumentCaptor = ArgumentCaptor
|
||||||
|
.forClass(null);
|
||||||
|
ArgumentCaptor<Metadata> metadataArgumentCaptor = ArgumentCaptor.forClass(null);
|
||||||
|
verify(mockFactory).newClientStreamTracer(streamInfoArgumentCaptor.capture(),
|
||||||
|
metadataArgumentCaptor.capture());
|
||||||
|
assertThat(streamInfoArgumentCaptor.getValue()).isSameAs(STREAM_INFO);
|
||||||
|
assertThat(metadataArgumentCaptor.getValue()).isSameAs(metadata);
|
||||||
|
verify(mockTracer).streamClosed(Status.OK);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void loadStatsRecording() {
|
||||||
|
Locality locality1 =
|
||||||
|
Locality.newBuilder()
|
||||||
|
.setRegion("test_region1")
|
||||||
|
.setZone("test_zone")
|
||||||
|
.setSubZone("test_subzone")
|
||||||
|
.build();
|
||||||
|
loadStore.addLocality(locality1);
|
||||||
|
PickResult pickResult1 = PickResult.withSubchannel(fakeSubchannel);
|
||||||
|
PickResult interceptedPickResult1 = loadStore.interceptPickResult(pickResult1, locality1);
|
||||||
|
assertThat(interceptedPickResult1.getSubchannel()).isSameAs(fakeSubchannel);
|
||||||
|
assertThat(localityLoadCounters).containsKey(locality1);
|
||||||
|
ClientStreamTracer tracer =
|
||||||
|
interceptedPickResult1
|
||||||
|
.getStreamTracerFactory()
|
||||||
|
.newClientStreamTracer(STREAM_INFO, new Metadata());
|
||||||
|
Duration interval = Duration.newBuilder().setNanos(342).build();
|
||||||
|
ClusterStats expectedLoadReport = buildClusterStats(interval,
|
||||||
|
Collections.singletonList(buildUpstreamLocalityStats(locality1, 0, 0, 1)), null);
|
||||||
|
assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval));
|
||||||
|
|
||||||
|
// Make another load report should not reset count for calls in progress.
|
||||||
|
assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval));
|
||||||
|
|
||||||
|
tracer.streamClosed(Status.OK);
|
||||||
|
expectedLoadReport = buildClusterStats(interval,
|
||||||
|
Collections.singletonList(buildUpstreamLocalityStats(locality1, 1, 0, 0)), null);
|
||||||
|
assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval));
|
||||||
|
|
||||||
|
// Make another load report should reset finished calls count for calls finished.
|
||||||
|
expectedLoadReport = buildClusterStats(interval,
|
||||||
|
Collections.singletonList(buildUpstreamLocalityStats(locality1, 0, 0, 0)), null);
|
||||||
|
assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval));
|
||||||
|
|
||||||
|
// PickResult within the same locality should aggregate to the same counter.
|
||||||
|
PickResult pickResult2 = PickResult.withSubchannel(fakeSubchannel);
|
||||||
|
PickResult interceptedPickResult2 = loadStore.interceptPickResult(pickResult2, locality1);
|
||||||
|
assertThat(localityLoadCounters).hasSize(1);
|
||||||
|
interceptedPickResult1
|
||||||
|
.getStreamTracerFactory()
|
||||||
|
.newClientStreamTracer(STREAM_INFO, new Metadata())
|
||||||
|
.streamClosed(Status.ABORTED);
|
||||||
|
interceptedPickResult2
|
||||||
|
.getStreamTracerFactory()
|
||||||
|
.newClientStreamTracer(STREAM_INFO, new Metadata())
|
||||||
|
.streamClosed(Status.CANCELLED);
|
||||||
|
expectedLoadReport = buildClusterStats(interval,
|
||||||
|
Collections.singletonList(buildUpstreamLocalityStats(locality1, 0, 2, 0)), null);
|
||||||
|
assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval));
|
||||||
|
|
||||||
|
expectedLoadReport = buildClusterStats(interval,
|
||||||
|
Collections.singletonList(buildUpstreamLocalityStats(locality1, 0, 0, 0)), null);
|
||||||
|
assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval));
|
||||||
|
|
||||||
|
Locality locality2 =
|
||||||
|
Locality.newBuilder()
|
||||||
|
.setRegion("test_region2")
|
||||||
|
.setZone("test_zone")
|
||||||
|
.setSubZone("test_subzone")
|
||||||
|
.build();
|
||||||
|
loadStore.addLocality(locality2);
|
||||||
|
PickResult pickResult3 = PickResult.withSubchannel(fakeSubchannel);
|
||||||
|
PickResult interceptedPickResult3 = loadStore.interceptPickResult(pickResult3, locality2);
|
||||||
|
assertThat(localityLoadCounters).containsKey(locality2);
|
||||||
|
assertThat(localityLoadCounters).hasSize(2);
|
||||||
|
interceptedPickResult3
|
||||||
|
.getStreamTracerFactory()
|
||||||
|
.newClientStreamTracer(STREAM_INFO, new Metadata());
|
||||||
|
List<UpstreamLocalityStats> upstreamLocalityStatsList =
|
||||||
|
Arrays.asList(buildUpstreamLocalityStats(locality1, 0, 0, 0),
|
||||||
|
buildUpstreamLocalityStats(locality2, 0, 0, 1));
|
||||||
|
expectedLoadReport = buildClusterStats(interval, upstreamLocalityStatsList, null);
|
||||||
|
assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void loadRecordingForRemovedLocality() {
|
||||||
|
loadStore.addLocality(TEST_LOCALITY);
|
||||||
|
assertThat(localityLoadCounters).containsKey(TEST_LOCALITY);
|
||||||
|
PickResult pickResult = PickResult.withSubchannel(fakeSubchannel);
|
||||||
|
PickResult interceptedPickResult = loadStore.interceptPickResult(pickResult, TEST_LOCALITY);
|
||||||
|
ClientStreamTracer tracer = interceptedPickResult
|
||||||
|
.getStreamTracerFactory()
|
||||||
|
.newClientStreamTracer(STREAM_INFO, new Metadata());
|
||||||
|
|
||||||
|
Duration interval = Duration.newBuilder().setNanos(342).build();
|
||||||
|
ClusterStats expectedLoadReport = buildClusterStats(interval,
|
||||||
|
Collections.singletonList(buildUpstreamLocalityStats(TEST_LOCALITY, 0, 0, 1)), null);
|
||||||
|
assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval));
|
||||||
|
// Remote balancer instructs to remove the locality while client has in-progress calls
|
||||||
|
// to backends in the locality, the XdsClientLoadStore continues tracking its load stats.
|
||||||
|
loadStore.removeLocality(TEST_LOCALITY);
|
||||||
|
assertThat(localityLoadCounters).containsKey(TEST_LOCALITY);
|
||||||
|
expectedLoadReport = buildClusterStats(interval,
|
||||||
|
Collections.singletonList(buildUpstreamLocalityStats(TEST_LOCALITY, 0, 0, 1)), null);
|
||||||
|
assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval));
|
||||||
|
|
||||||
|
tracer.streamClosed(Status.OK);
|
||||||
|
expectedLoadReport = buildClusterStats(interval,
|
||||||
|
Collections.singletonList(buildUpstreamLocalityStats(TEST_LOCALITY, 1, 0, 0)), null);
|
||||||
|
assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval));
|
||||||
|
assertThat(localityLoadCounters).doesNotContainKey(TEST_LOCALITY);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void recordingDroppedRequests() {
|
||||||
|
Random rand = new Random();
|
||||||
|
int numLbDrop = rand.nextInt(1000);
|
||||||
|
int numThrottleDrop = rand.nextInt(1000);
|
||||||
|
for (int i = 0; i < numLbDrop; i++) {
|
||||||
|
loadStore.recordDroppedRequest("lb");
|
||||||
|
}
|
||||||
|
for (int i = 0; i < numThrottleDrop; i++) {
|
||||||
|
loadStore.recordDroppedRequest("throttle");
|
||||||
|
}
|
||||||
|
Duration interval = Duration.newBuilder().setNanos(342).build();
|
||||||
|
ClusterStats expectedLoadReport = buildClusterStats(interval,
|
||||||
|
null,
|
||||||
|
Arrays.asList(
|
||||||
|
DroppedRequests.newBuilder()
|
||||||
|
.setCategory("lb")
|
||||||
|
.setDroppedCount(numLbDrop)
|
||||||
|
.build(),
|
||||||
|
DroppedRequests.newBuilder()
|
||||||
|
.setCategory("throttle")
|
||||||
|
.setDroppedCount(numThrottleDrop)
|
||||||
|
.build()
|
||||||
|
));
|
||||||
|
assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport(interval));
|
||||||
|
assertEquals(0, dropCounters.get("lb").get());
|
||||||
|
assertEquals(0, dropCounters.get("throttle").get());
|
||||||
|
}
|
||||||
|
|
||||||
|
private UpstreamLocalityStats buildUpstreamLocalityStats(Locality locality, long callsSucceed,
|
||||||
|
long callsFailed, long callsInProgress) {
|
||||||
|
return UpstreamLocalityStats.newBuilder()
|
||||||
|
.setLocality(locality)
|
||||||
|
.setTotalSuccessfulRequests(callsSucceed)
|
||||||
|
.setTotalErrorRequests(callsFailed)
|
||||||
|
.setTotalRequestsInProgress(callsInProgress)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ClusterStats buildClusterStats(Duration interval,
|
||||||
|
@Nullable List<UpstreamLocalityStats> upstreamLocalityStatsList,
|
||||||
|
@Nullable List<DroppedRequests> droppedRequestsList) {
|
||||||
|
ClusterStats.Builder clusterStatsBuilder = ClusterStats.newBuilder()
|
||||||
|
.setClusterName(SERVICE_NAME)
|
||||||
|
.setLoadReportInterval(interval);
|
||||||
|
if (upstreamLocalityStatsList != null) {
|
||||||
|
clusterStatsBuilder.addAllUpstreamLocalityStats(upstreamLocalityStatsList);
|
||||||
|
}
|
||||||
|
if (droppedRequestsList != null) {
|
||||||
|
long dropCount = 0;
|
||||||
|
for (DroppedRequests drop : droppedRequestsList) {
|
||||||
|
dropCount += drop.getDroppedCount();
|
||||||
|
clusterStatsBuilder.addDroppedRequests(drop);
|
||||||
|
}
|
||||||
|
clusterStatsBuilder.setTotalDroppedRequests(dropCount);
|
||||||
|
}
|
||||||
|
return clusterStatsBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertClusterStatsEqual(ClusterStats stats1, ClusterStats stats2) {
|
||||||
|
assertEquals(stats1.getClusterName(), stats2.getClusterName());
|
||||||
|
assertEquals(stats1.getLoadReportInterval(), stats2.getLoadReportInterval());
|
||||||
|
assertEquals(stats1.getUpstreamLocalityStatsCount(), stats2.getUpstreamLocalityStatsCount());
|
||||||
|
assertEquals(stats1.getDroppedRequestsCount(), stats2.getDroppedRequestsCount());
|
||||||
|
assertEquals(new HashSet<>(stats1.getUpstreamLocalityStatsList()),
|
||||||
|
new HashSet<>(stats2.getUpstreamLocalityStatsList()));
|
||||||
|
assertEquals(new HashSet<>(stats1.getDroppedRequestsList()),
|
||||||
|
new HashSet<>(stats2.getDroppedRequestsList()));
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue