core: add first-class drop support in LoadBalancer.

Resolves #3355

GrpclbLoadBalancer should use PickResult.withDrop() instead of PickResult.withError() when dropping RPCs.
This commit is contained in:
Kun Zhang 2017-10-06 16:23:10 -07:00 committed by GitHub
parent 291f17059e
commit 72f6d9bc08
5 changed files with 89 additions and 6 deletions

View File

@ -235,7 +235,7 @@ public abstract class LoadBalancer {
*/
@Immutable
public static final class PickResult {
private static final PickResult NO_RESULT = new PickResult(null, null, Status.OK);
private static final PickResult NO_RESULT = new PickResult(null, null, Status.OK, false);
@Nullable private final Subchannel subchannel;
@Nullable private final ClientStreamTracer.Factory streamTracerFactory;
@ -243,13 +243,16 @@ public abstract class LoadBalancer {
// Or OK if there is no error.
// subchannel being null and error being OK means RPC needs to wait
private final Status status;
// True if the result is created by withDrop()
private final boolean drop;
private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status) {
Status status, boolean drop) {
this.subchannel = subchannel;
this.streamTracerFactory = streamTracerFactory;
this.status = Preconditions.checkNotNull(status, "status");
this.drop = drop;
}
/**
@ -320,7 +323,8 @@ public abstract class LoadBalancer {
public static PickResult withSubchannel(
Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory) {
return new PickResult(
Preconditions.checkNotNull(subchannel, "subchannel"), streamTracerFactory, Status.OK);
Preconditions.checkNotNull(subchannel, "subchannel"), streamTracerFactory, Status.OK,
false);
}
/**
@ -339,7 +343,18 @@ public abstract class LoadBalancer {
*/
public static PickResult withError(Status error) {
Preconditions.checkArgument(!error.isOk(), "error status shouldn't be OK");
return new PickResult(null, null, error);
return new PickResult(null, null, error, false);
}
/**
* A decision to fail an RPC immediately. This is a final decision and will ignore retry
* policy.
*
* @param status the status with which the RPC will fail. Must not be OK.
*/
public static PickResult withDrop(Status status) {
Preconditions.checkArgument(!status.isOk(), "drop status shouldn't be OK");
return new PickResult(null, null, status, true);
}
/**
@ -374,12 +389,20 @@ public abstract class LoadBalancer {
return status;
}
/**
* Returns {@code true} if this result was created by {@link #withDrop withDrop()}.
*/
public boolean isDrop() {
return drop;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("subchannel", subchannel)
.add("streamTracerFactory", streamTracerFactory)
.add("status", status)
.add("drop", drop)
.toString();
}

View File

@ -631,7 +631,7 @@ public final class GrpcUtil {
}
};
}
if (!result.getStatus().isOk() && !isWaitForReady) {
if (!result.getStatus().isOk() && (result.isDrop() || !isWaitForReady)) {
return new FailingClientTransport(result.getStatus());
}
return null;

View File

@ -653,6 +653,65 @@ public class ManagedChannelImplTest {
any(Metadata.class), any(CallOptions.class));
}
@Test
public void failFastRpcFailFromErrorFromBalancer() {
subtestFailRpcFromBalancer(false, false, true);
}
@Test
public void failFastRpcFailFromDropFromBalancer() {
subtestFailRpcFromBalancer(false, true, true);
}
@Test
public void waitForReadyRpcImmuneFromErrorFromBalancer() {
subtestFailRpcFromBalancer(true, false, false);
}
@Test
public void waitForReadyRpcFailFromDropFromBalancer() {
subtestFailRpcFromBalancer(true, true, true);
}
private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
// This call will be buffered by the channel, thus involve delayed transport
CallOptions callOptions = CallOptions.DEFAULT;
if (waitForReady) {
callOptions = callOptions.withWaitForReady();
} else {
callOptions = callOptions.withoutWaitForReady();
}
ClientCall<String, Integer> call1 = channel.newCall(method, callOptions);
call1.start(mockCallListener, new Metadata());
SubchannelPicker picker = mock(SubchannelPicker.class);
Status status = Status.UNAVAILABLE.withDescription("for test");
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status));
helper.updateBalancingState(READY, picker);
executor.runDueTasks();
if (shouldFail) {
verify(mockCallListener).onClose(same(status), any(Metadata.class));
} else {
verifyZeroInteractions(mockCallListener);
}
// This call doesn't involve delayed transport
ClientCall<String, Integer> call2 = channel.newCall(method, callOptions);
call2.start(mockCallListener2, new Metadata());
executor.runDueTasks();
if (shouldFail) {
verify(mockCallListener2).onClose(same(status), any(Metadata.class));
} else {
verifyZeroInteractions(mockCallListener2);
}
}
/**
* Verify that if all resolved addresses failed to connect, a fail-fast call will fail, while a
* wait-for-ready call will still be buffered.

View File

@ -77,7 +77,7 @@ final class GrpclbState {
@VisibleForTesting
static final PickResult DROP_PICK_RESULT =
PickResult.withError(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer"));
PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer"));
@VisibleForTesting
static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() {

View File

@ -293,6 +293,7 @@ public class GrpclbLoadBalancerTest {
@Test
public void roundRobinPickerWithDrop() {
assertTrue(DROP_PICK_RESULT.isDrop());
GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider);
Subchannel subchannel = mock(Subchannel.class);
// 1 out of 2 requests are to be dropped