diff --git a/xds/src/main/java/io/grpc/xds/XdsComms.java b/xds/src/main/java/io/grpc/xds/XdsComms.java index 5a8db8efa6..762050670b 100644 --- a/xds/src/main/java/io/grpc/xds/XdsComms.java +++ b/xds/src/main/java/io/grpc/xds/XdsComms.java @@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; +import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Struct; @@ -41,6 +43,8 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer.Helper; import io.grpc.ManagedChannel; import io.grpc.Status; +import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.internal.BackoffPolicy; import io.grpc.stub.StreamObserver; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -49,6 +53,8 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.annotation.CheckForNull; /** * ADS client implementation. @@ -56,7 +62,14 @@ import java.util.Map; final class XdsComms { private final ManagedChannel channel; private final Helper helper; + private final BackoffPolicy.Provider backoffPolicyProvider; + private final Supplier stopwatchSupplier; + @CheckForNull + private ScheduledHandle adsRpcRetryTimer; + + // never null + private BackoffPolicy adsRpcRetryPolicy; // never null private AdsStream adsStream; @@ -169,9 +182,12 @@ final class XdsComms { final StreamObserver xdsRequestWriter; + final Stopwatch retryStopwatch = stopwatchSupplier.get().start(); + final StreamObserver xdsResponseReader = new StreamObserver() { + // Must be accessed in SynchronizationContext boolean firstEdsResponseReceived; @Override @@ -193,6 +209,7 @@ final class XdsComms { } catch (InvalidProtocolBufferException | RuntimeException e) { cancelRpc("Received invalid EDS response", e); adsStreamCallback.onError(); + scheduleRetry(); return; } @@ -254,12 +271,12 @@ final class XdsComms { new Runnable() { @Override public void run() { - // TODO: schedule retry closed = true; if (cancelled) { return; } adsStreamCallback.onError(); + scheduleRetry(); } }); } @@ -269,6 +286,42 @@ final class XdsComms { onError(Status.INTERNAL.withDescription("Server closed the ADS streaming RPC") .asException()); } + + // run in SynchronizationContext + void scheduleRetry() { + if (channel.isShutdown()) { + return; + } + + checkState( + cancelled || closed, + "Scheduling retry while the stream is neither cancelled nor closed"); + + checkState( + adsRpcRetryTimer == null, "Scheduling retry while a retry is already pending"); + + class AdsRpcRetryTask implements Runnable { + @Override + public void run() { + adsRpcRetryTimer = null; + refreshAdsStream(); + } + } + + if (firstEdsResponseReceived) { + // Reset the backoff sequence if balancer has sent the initial response + adsRpcRetryPolicy = backoffPolicyProvider.get(); + // Retry immediately + helper.getSynchronizationContext().execute(new AdsRpcRetryTask()); + return; + } + + adsRpcRetryTimer = helper.getSynchronizationContext().schedule( + new AdsRpcRetryTask(), + adsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS), + TimeUnit.NANOSECONDS, + helper.getScheduledExecutorService()); + } }; boolean cancelled; @@ -280,6 +333,7 @@ final class XdsComms { .streamAggregatedResources(xdsResponseReader); this.localityStore = localityStore; + checkState(adsRpcRetryTimer == null, "Creating AdsStream while retry is pending"); // Assuming standard mode, and send EDS request only DiscoveryRequest edsRequest = DiscoveryRequest.newBuilder() @@ -301,6 +355,7 @@ final class XdsComms { this(adsStream.adsStreamCallback, adsStream.localityStore); } + // run in SynchronizationContext void cancelRpc(String message, Throwable cause) { if (cancelled) { return; @@ -341,26 +396,42 @@ final class XdsComms { */ XdsComms( ManagedChannel channel, Helper helper, AdsStreamCallback adsStreamCallback, - LocalityStore localityStore) { + LocalityStore localityStore, BackoffPolicy.Provider backoffPolicyProvider, + Supplier stopwatchSupplier) { this.channel = checkNotNull(channel, "channel"); this.helper = checkNotNull(helper, "helper"); + this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.adsStream = new AdsStream( checkNotNull(adsStreamCallback, "adsStreamCallback"), checkNotNull(localityStore, "localityStore")); + this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); + this.adsRpcRetryPolicy = backoffPolicyProvider.get(); } + // run in SynchronizationContext void refreshAdsStream() { checkState(!channel.isShutdown(), "channel is alreday shutdown"); if (adsStream.closed || adsStream.cancelled) { + cancelRetryTimer(); adsStream = new AdsStream(adsStream); } } + // run in SynchronizationContext // TODO: Change method name to shutdown or shutdownXdsComms if that gives better semantics ( // cancel LB RPC and clean up retry timer). void shutdownLbRpc(String message) { adsStream.cancelRpc(message, null); + cancelRetryTimer(); + } + + // run in SynchronizationContext + private void cancelRetryTimer() { + if (adsRpcRetryTimer != null) { + adsRpcRetryTimer.cancel(); + adsRpcRetryTimer = null; + } } /** diff --git a/xds/src/main/java/io/grpc/xds/XdsLbState.java b/xds/src/main/java/io/grpc/xds/XdsLbState.java index 7fe3739399..a2c9157a85 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLbState.java +++ b/xds/src/main/java/io/grpc/xds/XdsLbState.java @@ -25,6 +25,8 @@ import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Subchannel; import io.grpc.ManagedChannel; import io.grpc.Status; +import io.grpc.internal.BackoffPolicy; +import io.grpc.internal.GrpcUtil; import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.xds.XdsComms.AdsStreamCallback; import java.util.List; @@ -55,6 +57,7 @@ class XdsLbState { private final Helper helper; private final ManagedChannel channel; private final AdsStreamCallback adsStreamCallback; + private final BackoffPolicy.Provider backoffPolicyProvider; @Nullable private XdsComms xdsComms; @@ -65,13 +68,15 @@ class XdsLbState { Helper helper, LocalityStore localityStore, ManagedChannel channel, - AdsStreamCallback adsStreamCallback) { + AdsStreamCallback adsStreamCallback, + BackoffPolicy.Provider backoffPolicyProvider) { this.balancerName = checkNotNull(balancerName, "balancerName"); this.childPolicy = childPolicy; this.helper = checkNotNull(helper, "helper"); this.localityStore = checkNotNull(localityStore, "localityStore"); this.channel = checkNotNull(channel, "channel"); this.adsStreamCallback = checkNotNull(adsStreamCallback, "adsStreamCallback"); + this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); } final void handleResolvedAddressGroups( @@ -82,7 +87,9 @@ class XdsLbState { xdsComms.refreshAdsStream(); } else { // TODO(zdapeng): pass a helper that has the right ChannelLogger for the oobChannel - xdsComms = new XdsComms(channel, helper, adsStreamCallback, localityStore); + xdsComms = new XdsComms( + channel, helper, adsStreamCallback, localityStore, backoffPolicyProvider, + GrpcUtil.STOPWATCH_SUPPLIER); } // TODO: maybe update picker diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java index c6663efe93..ffe2f144a2 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java @@ -228,7 +228,7 @@ final class XdsLoadBalancer extends LoadBalancer { } xdsLbState = new XdsLbState(newBalancerName, childPolicy, helper, localityStore, lbChannel, - adsStreamCallback); + adsStreamCallback, backoffPolicyProvider); } private static ManagedChannel initLbChannel(Helper helper, String balancerName) { diff --git a/xds/src/test/java/io/grpc/xds/XdsCommsTest.java b/xds/src/test/java/io/grpc/xds/XdsCommsTest.java index 04a9accb8b..8c65b7e6fb 100644 --- a/xds/src/test/java/io/grpc/xds/XdsCommsTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsCommsTest.java @@ -19,6 +19,8 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; @@ -54,6 +56,8 @@ import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.BackoffPolicy; +import io.grpc.internal.FakeClock; import io.grpc.internal.testing.StreamRecorder; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; @@ -78,6 +82,16 @@ import org.mockito.MockitoAnnotations; */ @RunWith(JUnit4.class) public class XdsCommsTest { + private static final String EDS_TYPE_URL = + "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"; + private static final FakeClock.TaskFilter LB_RPC_RETRY_TASK_FILTER = + new FakeClock.TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return command.toString().contains("AdsRpcRetryTask"); + } + }; + @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); @Mock @@ -86,9 +100,16 @@ public class XdsCommsTest { private AdsStreamCallback adsStreamCallback; @Mock private LocalityStore localityStore; + @Mock + private BackoffPolicy.Provider backoffPolicyProvider; + @Mock + private BackoffPolicy backoffPolicy1; + @Mock + private BackoffPolicy backoffPolicy2; @Captor private ArgumentCaptor> localityEndpointsMappingCaptor; + private final FakeClock fakeClock = new FakeClock(); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @Override @@ -98,7 +119,7 @@ public class XdsCommsTest { }); private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry(); - private final StreamRecorder streamRecorder = StreamRecorder.create(); + private StreamRecorder streamRecorder; private StreamObserver responseWriter; private ManagedChannel channel; @@ -115,6 +136,7 @@ public class XdsCommsTest { public StreamObserver streamAggregatedResources( final StreamObserver responseObserver) { responseWriter = responseObserver; + streamRecorder = StreamRecorder.create(); return new StreamObserver() { @@ -131,7 +153,6 @@ public class XdsCommsTest { @Override public void onCompleted() { streamRecorder.onCompleted(); - responseObserver.onCompleted(); } }; } @@ -148,6 +169,7 @@ public class XdsCommsTest { cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); doReturn("fake_authority").when(helper).getAuthority(); doReturn(syncContext).when(helper).getSynchronizationContext(); + doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService(); doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger(); lbRegistry.register(new LoadBalancerProvider() { @Override @@ -170,7 +192,12 @@ public class XdsCommsTest { return null; } }); - xdsComms = new XdsComms(channel, helper, adsStreamCallback, localityStore); + doReturn(backoffPolicy1, backoffPolicy2).when(backoffPolicyProvider).get(); + doReturn(10L, 100L, 1000L).when(backoffPolicy1).nextBackoffNanos(); + doReturn(20L, 200L).when(backoffPolicy2).nextBackoffNanos(); + xdsComms = new XdsComms( + channel, helper, adsStreamCallback, localityStore, backoffPolicyProvider, + fakeClock.getStopwatchSupplier()); } @Test @@ -192,8 +219,7 @@ public class XdsCommsTest { public void standardMode_sendEdsRequest_getEdsResponse_withNoDrop() { assertThat(streamRecorder.getValues()).hasSize(1); DiscoveryRequest request = streamRecorder.getValues().get(0); - assertThat(request.getTypeUrl()) - .isEqualTo("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"); + assertThat(request.getTypeUrl()).isEqualTo(EDS_TYPE_URL); assertThat( request.getNode().getMetadata().getFieldsOrThrow("endpoints_required").getBoolValue()) .isTrue(); @@ -243,7 +269,7 @@ public class XdsCommsTest { .addLbEndpoints(endpoint22) .setLoadBalancingWeight(UInt32Value.of(2))) .build())) - .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") + .setTypeUrl(EDS_TYPE_URL) .build(); responseWriter.onNext(edsResponse); @@ -282,7 +308,7 @@ public class XdsCommsTest { .addLbEndpoints(endpoint12) .setLoadBalancingWeight(UInt32Value.of(1))) .build())) - .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") + .setTypeUrl(EDS_TYPE_URL) .build(); responseWriter.onNext(edsResponse); @@ -351,7 +377,7 @@ public class XdsCommsTest { .build()) .build()) .build())) - .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") + .setTypeUrl(EDS_TYPE_URL) .build(); responseWriter.onNext(edsResponseWithDrops); @@ -414,7 +440,7 @@ public class XdsCommsTest { .build()) .build()) .build())) - .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") + .setTypeUrl(EDS_TYPE_URL) .build(); responseWriter.onNext(edsResponseWithAllDrops); @@ -439,4 +465,191 @@ public class XdsCommsTest { verify(adsStreamCallback).onError(); verifyNoMoreInteractions(adsStreamCallback); } + + /** + * The 1st ADS RPC receives invalid response. Verify retry is scheduled. + * Verify the 2nd RPC (retry) starts after backoff. + * + *

The 2nd RPC fails with response observer onError() without receiving initial response. + * Verify retry is scheduled. Verify the 3rd PRC starts after backoff. + * + *

The 3rd PRC receives invalid initial response. Verify retry is scheduled. + * Verify the 4th PRC starts after backoff. + * + *

The 4th RPC receives valid initial response and then fails with response observer + * onError(). Verify retry is scheduled. Verify the backoff is reset. Verify the 5th PRC starts + * immediately. + * + *

The 5th RPC fails with response observer onError() without receiving initial response. + * Verify retry is scheduled. Verify the 6th PRC starts after backoff. + * + *

The 6th RPC fails with response observer onError() without receiving initial response. + * Verify retry is scheduled. Call {@link XdsComms#shutdownLbRpc(String)}, verify retry timer is + * cancelled. + */ + @Test + public void adsRpcRetry() { + StreamRecorder currentStreamRecorder = streamRecorder; + assertThat(currentStreamRecorder.getValues()).hasSize(1); + InOrder inOrder = + inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2, adsStreamCallback); + inOrder.verify(backoffPolicyProvider).get(); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + DiscoveryResponse invalidResponse = + DiscoveryResponse.newBuilder().setTypeUrl(EDS_TYPE_URL).build(); + // The 1st ADS RPC receives invalid response + responseWriter.onNext(invalidResponse); + inOrder.verify(adsStreamCallback).onError(); + assertThat(currentStreamRecorder.getError()).isNotNull(); + + // Will start backoff sequence 1 (10ns) + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + // Fast-forward to a moment before the retry + fakeClock.forwardNanos(9); + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + assertSame(streamRecorder, currentStreamRecorder); + + // Then time for retry + fakeClock.forwardNanos(1); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + assertNotSame(currentStreamRecorder, streamRecorder); + currentStreamRecorder = streamRecorder; + assertThat(currentStreamRecorder.getValues()).hasSize(1); + + // Fail the retry after spending 4ns + fakeClock.forwardNanos(4); + // The 2nd RPC fails with response observer onError() without receiving initial response + responseWriter.onError(new Exception("fake error")); + inOrder.verify(adsStreamCallback).onError(); + + // Will start backoff sequence 2 (100ns) + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + // Fast-forward to a moment before the retry, the time spent in the last try is deducted. + fakeClock.forwardNanos(100 - 4 - 1); + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + assertSame(streamRecorder, currentStreamRecorder); + + // Then time for retry + fakeClock.forwardNanos(1); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + assertNotSame(currentStreamRecorder, streamRecorder); + currentStreamRecorder = streamRecorder; + assertThat(currentStreamRecorder.getValues()).hasSize(1); + assertThat(currentStreamRecorder.getError()).isNull(); + + // Fail the retry after spending 5ns + fakeClock.forwardNanos(5); + // The 3rd PRC receives invalid initial response. + responseWriter.onNext(invalidResponse); + inOrder.verify(adsStreamCallback).onError(); + assertThat(currentStreamRecorder.getError()).isNotNull(); + + // Will start backoff sequence 3 (1000ns) + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + // Fast-forward to a moment before the retry, the time spent in the last try is deducted. + fakeClock.forwardNanos(1000 - 5 - 1); + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + assertSame(streamRecorder, currentStreamRecorder); + + // Then time for retry + fakeClock.forwardNanos(1); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + assertNotSame(currentStreamRecorder, streamRecorder); + currentStreamRecorder = streamRecorder; + assertThat(currentStreamRecorder.getValues()).hasSize(1); + assertThat(currentStreamRecorder.getError()).isNull(); + + // The 4th RPC receives valid initial response + fakeClock.forwardNanos(6); + Locality localityProto1 = Locality.newBuilder() + .setRegion("region1").setZone("zone1").setSubZone("subzone1").build(); + LbEndpoint endpoint11 = LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("addr11").setPortValue(11)))) + .setLoadBalancingWeight(UInt32Value.of(11)) + .build(); + DiscoveryResponse validEdsResponse = DiscoveryResponse.newBuilder() + .addResources(Any.pack(ClusterLoadAssignment.newBuilder() + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLocality(localityProto1) + .addLbEndpoints(endpoint11) + .setLoadBalancingWeight(UInt32Value.of(1))) + .build())) + .setTypeUrl(EDS_TYPE_URL) + .build(); + responseWriter.onNext(validEdsResponse); + + inOrder.verify(backoffPolicyProvider, never()).get(); + inOrder.verify(backoffPolicy2, never()).nextBackoffNanos(); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + // The 4th RPC then fails with response observer onError() + fakeClock.forwardNanos(7); + responseWriter.onError(new Exception("fake error")); + + // Will reset the retry sequence and retry immediately, because balancer has responded. + inOrder.verify(backoffPolicyProvider).get(); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + assertNotSame(currentStreamRecorder, streamRecorder); + currentStreamRecorder = streamRecorder; + assertThat(currentStreamRecorder.getValues()).hasSize(1); + assertThat(currentStreamRecorder.getError()).isNull(); + + // The 5th RPC fails with response observer onError() without receiving initial response + fakeClock.forwardNanos(8); + responseWriter.onError(new Exception("fake error")); + inOrder.verify(adsStreamCallback).onError(); + + // Will start backoff sequence 1 (20ns) + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + // Fast-forward to a moment before the retry, the time spent in the last try is deducted. + fakeClock.forwardNanos(20 - 8 - 1); + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + assertSame(streamRecorder, currentStreamRecorder); + + // Then time for retry + fakeClock.forwardNanos(1); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + assertNotSame(currentStreamRecorder, streamRecorder); + currentStreamRecorder = streamRecorder; + assertThat(currentStreamRecorder.getValues()).hasSize(1); + assertThat(currentStreamRecorder.getError()).isNull(); + + // Wrapping up + verify(backoffPolicyProvider, times(2)).get(); + verify(backoffPolicy1, times(3)).nextBackoffNanos(); // for 2nd, 3rd, 4th RPC + verify(backoffPolicy2, times(1)).nextBackoffNanos(); // for 6th RPC + + // The 6th RPC fails with response observer onError() without receiving initial response + responseWriter.onError(new Exception("fake error")); + inOrder.verify(adsStreamCallback).onError(); + + // Retry is scheduled + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + // Shutdown cancels retry + xdsComms.shutdownLbRpc("shutdown"); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + } + + @Test + public void refreshAdsStreamCancelsExistingRetry() { + responseWriter.onError(new Exception("fake error")); + verify(adsStreamCallback).onError(); + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + xdsComms.refreshAdsStream(); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + xdsComms.shutdownLbRpc("End test"); + } } diff --git a/xds/src/test/java/io/grpc/xds/XdsLbStateTest.java b/xds/src/test/java/io/grpc/xds/XdsLbStateTest.java index 9a9429bb6d..84b6125ad7 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLbStateTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsLbStateTest.java @@ -33,6 +33,7 @@ import io.grpc.ManagedChannel; import io.grpc.SynchronizationContext; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.BackoffPolicy; import io.grpc.internal.FakeClock; import io.grpc.internal.testing.StreamRecorder; import io.grpc.stub.StreamObserver; @@ -60,6 +61,8 @@ public class XdsLbStateTest { private AdsStreamCallback adsStreamCallback; @Mock private LocalityStore localityStore; + @Mock + private BackoffPolicy.Provider backoffPolicyProvider; private final FakeClock fakeClock = new FakeClock(); @@ -122,8 +125,9 @@ public class XdsLbStateTest { cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); doReturn(channel).when(helper).createResolvingOobChannel(BALANCER_NAME); - xdsLbState = - new XdsLbState(BALANCER_NAME, null, helper, localityStore, channel, adsStreamCallback); + xdsLbState = new XdsLbState( + BALANCER_NAME, null, helper, localityStore, channel, adsStreamCallback, + backoffPolicyProvider); } @Test