api: Replace ErrorPicker with FixedResultPicker

FixedResultPicker can be used in more situations. Note that
WrrLocalityLoadBalancerTest's test was changed non-trivially. The
noChildLb test was particularly nasty as it assumed
LoadBalancer.ErrorPicker had same toString() as
GracefulSwitchLoadBalancer's ErrorPicker.
This commit is contained in:
Eric Anderson 2023-09-26 15:28:29 -07:00 committed by GitHub
parent a004096b3c
commit cf4cf03d79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 140 additions and 68 deletions

View File

@ -1412,6 +1412,12 @@ public abstract class LoadBalancer {
public abstract LoadBalancer newLoadBalancer(Helper helper);
}
/**
* A picker that always returns an erring pick.
*
* @deprecated Use {@code new FixedResultPicker(PickResult.withError(error))} instead.
*/
@Deprecated
public static final class ErrorPicker extends SubchannelPicker {
private final Status error;
@ -1433,4 +1439,22 @@ public abstract class LoadBalancer {
}
}
/** A picker that always returns the same result. */
public static final class FixedResultPicker extends SubchannelPicker {
private final PickResult result;
public FixedResultPicker(PickResult result) {
this.result = Preconditions.checkNotNull(result, "result");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return result;
}
@Override
public String toString() {
return "FixedResultPicker(" + result + ")";
}
}
}

View File

@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.ExperimentalApi;
@ -57,21 +56,9 @@ public final class GracefulSwitchLoadBalancer extends ForwardingLoadBalancer {
@Override
public void handleNameResolutionError(final Status error) {
class ErrorPicker extends SubchannelPicker {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(ErrorPicker.class).add("error", error).toString();
}
}
helper.updateBalancingState(
ConnectivityState.TRANSIENT_FAILURE,
new ErrorPicker());
new FixedResultPicker(PickResult.withError(error)));
}
@Override

View File

@ -83,7 +83,7 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
}
protected SubchannelPicker getErrorPicker(Status error) {
return new ErrorPicker(error);
return new FixedResultPicker(PickResult.withError(error));
}
@VisibleForTesting

View File

@ -103,7 +103,8 @@ final class CdsLoadBalancer2 extends LoadBalancer {
if (cdsLbState != null && cdsLbState.childLb != null) {
cdsLbState.childLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}
@ -211,7 +212,8 @@ final class CdsLoadBalancer2 extends LoadBalancer {
}
if (loopStatus != null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(loopStatus));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(loopStatus)));
return;
}
@ -223,7 +225,8 @@ final class CdsLoadBalancer2 extends LoadBalancer {
Status unavailable =
Status.UNAVAILABLE.withDescription("CDS error: found 0 leaf (logical DNS or EDS) "
+ "clusters for root cluster " + root.name);
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(unavailable)));
return;
}
@ -295,7 +298,8 @@ final class CdsLoadBalancer2 extends LoadBalancer {
if (childLb != null) {
childLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}

View File

@ -143,7 +143,8 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
if (childSwitchLb != null) {
childSwitchLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}

View File

@ -197,7 +197,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
if (childLb != null) {
childLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}
@ -240,7 +241,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
Status.UNAVAILABLE.withCause(endpointNotFound.getCause())
.withDescription(endpointNotFound.getDescription());
}
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(endpointNotFound));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(endpointNotFound)));
if (childLb != null) {
childLb.shutdown();
childLb = null;
@ -275,7 +277,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
if (childLb != null) {
childLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}
}

View File

@ -126,7 +126,8 @@ final class PriorityLoadBalancer extends LoadBalancer {
}
}
if (gotoTransientFailure) {
updateOverallState(null, TRANSIENT_FAILURE, new ErrorPicker(error));
updateOverallState(
null, TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}
@ -225,8 +226,8 @@ final class PriorityLoadBalancer extends LoadBalancer {
// The child is deactivated.
return;
}
picker = new ErrorPicker(
Status.UNAVAILABLE.withDescription("Connection timeout for priority " + priority));
picker = new FixedResultPicker(PickResult.withError(
Status.UNAVAILABLE.withDescription("Connection timeout for priority " + priority)));
logger.log(XdsLogLevel.DEBUG, "Priority {0} failed over to next", priority);
currentPriority = null; // reset currentPriority to guarantee failover happen
tryNextPriority();

View File

@ -270,7 +270,8 @@ final class RingHashLoadBalancer extends LoadBalancer {
@Override
public void handleNameResolutionError(Status error) {
if (currentState != READY) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
}

View File

@ -114,7 +114,8 @@ final class WeightedTargetLoadBalancer extends LoadBalancer {
public void handleNameResolutionError(Status error) {
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
if (childBalancers.isEmpty()) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
}
for (LoadBalancer childBalancer : childBalancers.values()) {
childBalancer.handleNameResolutionError(error);

View File

@ -78,14 +78,14 @@ final class WrrLocalityLoadBalancer extends LoadBalancer {
Integer localityWeight = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT);
if (locality == null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality provided")));
helper.updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality provided"))));
return false;
}
if (localityWeight == null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(
helper.updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(
Status.UNAVAILABLE.withDescription(
"wrr_locality error: no weight provided for locality " + locality)));
"wrr_locality error: no weight provided for locality " + locality))));
return false;
}

View File

@ -365,7 +365,8 @@ public class ClusterManagerLoadBalancerTest {
config = resolvedAddresses.getLoadBalancingPolicyConfig();
if (failing) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.INTERNAL)));
}
return true;
}

View File

@ -41,7 +41,7 @@ import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.ErrorPicker;
import io.grpc.LoadBalancer.FixedResultPicker;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
@ -306,7 +306,8 @@ public class PriorityLoadBalancerTest {
assertCurrentPickerPicksSubchannel(subchannel0);
// p0 fails over to p1 immediately.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.ABORTED));
helper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.ABORTED)));
assertLatestConnectivityState(CONNECTING);
assertThat(fooBalancers).hasSize(2);
assertThat(fooHelpers).hasSize(2);
@ -345,11 +346,13 @@ public class PriorityLoadBalancerTest {
assertCurrentPickerPicksSubchannel(subchannel2);
// p2 fails but does not affect overall picker
helper2.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
helper2.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertCurrentPickerPicksSubchannel(subchannel2);
// p0 fails over to p3 immediately since p1 already timeout and p2 already in TRANSIENT_FAILURE.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
helper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertLatestConnectivityState(CONNECTING);
assertThat(fooBalancers).hasSize(4);
assertThat(fooHelpers).hasSize(4);
@ -362,7 +365,8 @@ public class PriorityLoadBalancerTest {
// p3 fails then the picker should have error status updated
helper3.updateBalancingState(
TRANSIENT_FAILURE, new ErrorPicker(Status.DATA_LOSS.withDescription("foo")));
TRANSIENT_FAILURE,
new FixedResultPicker(PickResult.withError(Status.DATA_LOSS.withDescription("foo"))));
assertCurrentPickerReturnsError(Status.Code.DATA_LOSS, "foo");
// p2 gets back to READY
@ -390,7 +394,8 @@ public class PriorityLoadBalancerTest {
assertCurrentPickerPicksSubchannel(subchannel4);
// p0 fails over to p2 and picker is updated to p2's existing picker.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
helper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertCurrentPickerPicksSubchannel(subchannel3);
// Deactivate child balancer get deleted.
@ -564,7 +569,8 @@ public class PriorityLoadBalancerTest {
assertCurrentPickerIsBufferPicker();
// p0 fails over to p1 immediately.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.ABORTED));
helper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.ABORTED)));
assertLatestConnectivityState(CONNECTING);
assertThat(fooBalancers).hasSize(2);
assertThat(fooHelpers).hasSize(2);
@ -591,11 +597,13 @@ public class PriorityLoadBalancerTest {
assertCurrentPickerIsBufferPicker();
// p2 fails but does not affect overall picker
helper2.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
helper2.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertCurrentPickerIsBufferPicker();
// p0 fails over to p3 immediately since p1 already timeout and p2 already in TRANSIENT_FAILURE.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
helper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertLatestConnectivityState(CONNECTING);
assertThat(fooBalancers).hasSize(4);
assertThat(fooHelpers).hasSize(4);
@ -608,7 +616,8 @@ public class PriorityLoadBalancerTest {
// p3 fails then the picker should have error status updated
helper3.updateBalancingState(
TRANSIENT_FAILURE, new ErrorPicker(Status.DATA_LOSS.withDescription("foo")));
TRANSIENT_FAILURE,
new FixedResultPicker(PickResult.withError(Status.DATA_LOSS.withDescription("foo"))));
assertCurrentPickerReturnsError(Status.Code.DATA_LOSS, "foo");
// p2 gets back to IDLE
@ -624,7 +633,8 @@ public class PriorityLoadBalancerTest {
assertCurrentPickerIsBufferPicker();
// p0 fails over to p2 and picker is updated to p2's existing picker.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
helper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertCurrentPickerIsBufferPicker();
// Deactivate child balancer get deleted.
@ -655,7 +665,8 @@ public class PriorityLoadBalancerTest {
verify(helper, never()).refreshNameResolution();
// Simulate fallback to priority p1.
priorityHelper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
priorityHelper0.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.UNAVAILABLE)));
assertThat(fooHelpers).hasSize(2);
Helper priorityHelper1 = Iterables.getLast(fooHelpers);
priorityHelper1.refreshNameResolution();
@ -780,7 +791,8 @@ public class PriorityLoadBalancerTest {
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.INTERNAL)));
}
@Override

View File

@ -39,7 +39,7 @@ import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.ErrorPicker;
import io.grpc.LoadBalancer.FixedResultPicker;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
@ -324,10 +324,10 @@ public class WeightedTargetLoadBalancerTest {
mock(SubchannelPicker.class),
mock(SubchannelPicker.class)};
final SubchannelPicker[] failurePickers = new SubchannelPicker[]{
new ErrorPicker(Status.CANCELLED),
new ErrorPicker(Status.ABORTED),
new ErrorPicker(Status.DATA_LOSS),
new ErrorPicker(Status.DATA_LOSS)
new FixedResultPicker(PickResult.withError(Status.CANCELLED)),
new FixedResultPicker(PickResult.withError(Status.ABORTED)),
new FixedResultPicker(PickResult.withError(Status.DATA_LOSS)),
new FixedResultPicker(PickResult.withError(Status.DATA_LOSS))
};
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);
@ -463,7 +463,8 @@ public class WeightedTargetLoadBalancerTest {
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.INTERNAL)));
}
@Override

View File

@ -20,6 +20,8 @@ import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -30,8 +32,9 @@ import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.ErrorPicker;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
@ -52,6 +55,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
@ -78,10 +82,6 @@ public class WrrLocalityLoadBalancerTest {
@Captor
private ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor;
@Captor
private ArgumentCaptor<ConnectivityState> connectivityStateCaptor;
@Captor
private ArgumentCaptor<SubchannelPicker> errorPickerCaptor;
private WrrLocalityLoadBalancer loadBalancer;
private LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
@ -153,18 +153,17 @@ public class WrrLocalityLoadBalancerTest {
// With no locality weights, we should get a TRANSIENT_FAILURE.
verify(mockHelper).getAuthority();
verify(mockHelper).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE),
isA(ErrorPicker.class));
pickerReturns(Status.Code.UNAVAILABLE));
}
@Test
public void handleNameResolutionError_noChildLb() {
loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED);
Status status = Status.DEADLINE_EXCEEDED.withDescription("down low");
loadBalancer.handleNameResolutionError(status);
verify(mockHelper).updateBalancingState(connectivityStateCaptor.capture(),
errorPickerCaptor.capture());
assertThat(connectivityStateCaptor.getValue()).isEqualTo(ConnectivityState.TRANSIENT_FAILURE);
assertThat(errorPickerCaptor.getValue().toString()).isEqualTo(
new ErrorPicker(Status.DEADLINE_EXCEEDED).toString());
verify(mockHelper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE),
pickerReturns(PickResult.withError(status)));
}
@Test
@ -172,11 +171,12 @@ public class WrrLocalityLoadBalancerTest {
deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)),
ImmutableList.of(
makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1)));
loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED);
Status status = Status.DEADLINE_EXCEEDED.withDescription("too slow");
loadBalancer.handleNameResolutionError(status);
verify(mockHelper, never()).updateBalancingState(isA(ConnectivityState.class),
isA(ErrorPicker.class));
verify(mockWeightedTargetLb).handleNameResolutionError(Status.DEADLINE_EXCEEDED);
verify(mockHelper, never()).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE),
pickerReturns(PickResult.withError(status)));
verify(mockWeightedTargetLb).handleNameResolutionError(status);
}
@Test
@ -224,6 +224,42 @@ public class WrrLocalityLoadBalancerTest {
.build());
}
private static SubchannelPicker pickerReturns(final PickResult result) {
return pickerReturns(new ArgumentMatcher<PickResult>() {
@Override public boolean matches(PickResult obj) {
return result.equals(obj);
}
@Override public String toString() {
return "[equals " + result + "]";
}
});
}
private static SubchannelPicker pickerReturns(Status.Code code) {
return pickerReturns(new ArgumentMatcher<PickResult>() {
@Override public boolean matches(PickResult obj) {
return obj.getStatus() != null && code.equals(obj.getStatus().getCode());
}
@Override public String toString() {
return "[with code " + code + "]";
}
});
}
private static SubchannelPicker pickerReturns(final ArgumentMatcher<PickResult> matcher) {
return argThat(new ArgumentMatcher<SubchannelPicker>() {
@Override public boolean matches(SubchannelPicker picker) {
return matcher.matches(picker.pickSubchannel(mock(PickSubchannelArgs.class)));
}
@Override public String toString() {
return "[picker returns: result " + matcher + "]";
}
});
}
/**
* Create a locality-labeled address.
*/