From 72f6d9bc08e2e37c890e13e036321f7049d5d6af Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Fri, 6 Oct 2017 16:23:10 -0700 Subject: [PATCH] core: add first-class drop support in LoadBalancer. Resolves #3355 GrpclbLoadBalancer should use PickResult.withDrop() instead of PickResult.withError() when dropping RPCs. --- core/src/main/java/io/grpc/LoadBalancer.java | 31 ++++++++-- .../main/java/io/grpc/internal/GrpcUtil.java | 2 +- .../grpc/internal/ManagedChannelImplTest.java | 59 +++++++++++++++++++ .../main/java/io/grpc/grpclb/GrpclbState.java | 2 +- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 1 + 5 files changed, 89 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java index 55152e78f3..9a84860887 100644 --- a/core/src/main/java/io/grpc/LoadBalancer.java +++ b/core/src/main/java/io/grpc/LoadBalancer.java @@ -235,7 +235,7 @@ public abstract class LoadBalancer { */ @Immutable public static final class PickResult { - private static final PickResult NO_RESULT = new PickResult(null, null, Status.OK); + private static final PickResult NO_RESULT = new PickResult(null, null, Status.OK, false); @Nullable private final Subchannel subchannel; @Nullable private final ClientStreamTracer.Factory streamTracerFactory; @@ -243,13 +243,16 @@ public abstract class LoadBalancer { // Or OK if there is no error. // subchannel being null and error being OK means RPC needs to wait private final Status status; + // True if the result is created by withDrop() + private final boolean drop; private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, - Status status) { + Status status, boolean drop) { this.subchannel = subchannel; this.streamTracerFactory = streamTracerFactory; this.status = Preconditions.checkNotNull(status, "status"); + this.drop = drop; } /** @@ -320,7 +323,8 @@ public abstract class LoadBalancer { public static PickResult withSubchannel( Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory) { return new PickResult( - Preconditions.checkNotNull(subchannel, "subchannel"), streamTracerFactory, Status.OK); + Preconditions.checkNotNull(subchannel, "subchannel"), streamTracerFactory, Status.OK, + false); } /** @@ -339,7 +343,18 @@ public abstract class LoadBalancer { */ public static PickResult withError(Status error) { Preconditions.checkArgument(!error.isOk(), "error status shouldn't be OK"); - return new PickResult(null, null, error); + return new PickResult(null, null, error, false); + } + + /** + * A decision to fail an RPC immediately. This is a final decision and will ignore retry + * policy. + * + * @param status the status with which the RPC will fail. Must not be OK. + */ + public static PickResult withDrop(Status status) { + Preconditions.checkArgument(!status.isOk(), "drop status shouldn't be OK"); + return new PickResult(null, null, status, true); } /** @@ -374,12 +389,20 @@ public abstract class LoadBalancer { return status; } + /** + * Returns {@code true} if this result was created by {@link #withDrop withDrop()}. + */ + public boolean isDrop() { + return drop; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) .add("subchannel", subchannel) .add("streamTracerFactory", streamTracerFactory) .add("status", status) + .add("drop", drop) .toString(); } diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index c3ff3649a6..af0caa3c04 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -631,7 +631,7 @@ public final class GrpcUtil { } }; } - if (!result.getStatus().isOk() && !isWaitForReady) { + if (!result.getStatus().isOk() && (result.isDrop() || !isWaitForReady)) { return new FailingClientTransport(result.getStatus()); } return null; diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index daeb59c013..19cc375289 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -653,6 +653,65 @@ public class ManagedChannelImplTest { any(Metadata.class), any(CallOptions.class)); } + @Test + public void failFastRpcFailFromErrorFromBalancer() { + subtestFailRpcFromBalancer(false, false, true); + } + + @Test + public void failFastRpcFailFromDropFromBalancer() { + subtestFailRpcFromBalancer(false, true, true); + } + + @Test + public void waitForReadyRpcImmuneFromErrorFromBalancer() { + subtestFailRpcFromBalancer(true, false, false); + } + + @Test + public void waitForReadyRpcFailFromDropFromBalancer() { + subtestFailRpcFromBalancer(true, true, true); + } + + private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) { + createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); + + // This call will be buffered by the channel, thus involve delayed transport + CallOptions callOptions = CallOptions.DEFAULT; + if (waitForReady) { + callOptions = callOptions.withWaitForReady(); + } else { + callOptions = callOptions.withoutWaitForReady(); + } + ClientCall call1 = channel.newCall(method, callOptions); + call1.start(mockCallListener, new Metadata()); + + SubchannelPicker picker = mock(SubchannelPicker.class); + Status status = Status.UNAVAILABLE.withDescription("for test"); + + when(picker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status)); + helper.updateBalancingState(READY, picker); + + executor.runDueTasks(); + if (shouldFail) { + verify(mockCallListener).onClose(same(status), any(Metadata.class)); + } else { + verifyZeroInteractions(mockCallListener); + } + + // This call doesn't involve delayed transport + ClientCall call2 = channel.newCall(method, callOptions); + call2.start(mockCallListener2, new Metadata()); + + executor.runDueTasks(); + if (shouldFail) { + verify(mockCallListener2).onClose(same(status), any(Metadata.class)); + } else { + verifyZeroInteractions(mockCallListener2); + } + } + /** * Verify that if all resolved addresses failed to connect, a fail-fast call will fail, while a * wait-for-ready call will still be buffered. diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 203f055e4e..301f65985d 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -77,7 +77,7 @@ final class GrpclbState { @VisibleForTesting static final PickResult DROP_PICK_RESULT = - PickResult.withError(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer")); + PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer")); @VisibleForTesting static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 23c2efb511..fc3beab677 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -293,6 +293,7 @@ public class GrpclbLoadBalancerTest { @Test public void roundRobinPickerWithDrop() { + assertTrue(DROP_PICK_RESULT.isDrop()); GrpclbClientLoadRecorder loadRecorder = new GrpclbClientLoadRecorder(timeProvider); Subchannel subchannel = mock(Subchannel.class); // 1 out of 2 requests are to be dropped