From 0fd8a6fcea29acef54da0e3788df43f76dfd0fd7 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Thu, 12 May 2022 10:50:58 -0700 Subject: [PATCH] xds: add OrcaOobHelperWrapper.setListener(), allow receiving reports per subchannel (#9143) remove OrcaOobHelperWrapper layer. Use OrcaOobUtil.updateListener() to set OrcaOobReportListener per each subchannel, not per helper. OrcaOobReportListener is per helper+subchannel unique. Orca stats are created when creating helper.createSubchannels(), overriding subchannel attributes to store orcaState in the orcaHelper created subchannels --- .../java/io/grpc/xds/orca/OrcaOobUtil.java | 254 ++++++++---------- .../io/grpc/xds/orca/OrcaOobUtilTest.java | 147 ++++++---- 2 files changed, 197 insertions(+), 204 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java b/xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java index ed344aab73..0e88c02cd1 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; -import static io.grpc.ConnectivityState.SHUTDOWN; import com.github.xds.data.orca.v3.OrcaLoadReport; import com.github.xds.service.orca.v3.OpenRcaServiceGrpc; @@ -31,6 +30,7 @@ import com.google.common.base.Objects; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.protobuf.util.Durations; +import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ChannelLogger; @@ -53,12 +53,8 @@ import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.ForwardingSubchannel; -import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -67,34 +63,13 @@ import javax.annotation.Nullable; /** * Utility class that provides method for {@link LoadBalancer} to install listeners to receive - * out-of-band backend cost metrics in the format of Open Request Cost Aggregation (ORCA). + * out-of-band backend metrics in the format of Open Request Cost Aggregation (ORCA). */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/9129") -public abstract class OrcaOobUtil { - +public final class OrcaOobUtil { private static final Logger logger = Logger.getLogger(OrcaPerRequestUtil.class.getName()); - private static final OrcaOobUtil DEFAULT_INSTANCE = - new OrcaOobUtil() { - @Override - public OrcaReportingHelperWrapper newOrcaReportingHelperWrapper( - LoadBalancer.Helper delegate, - OrcaOobReportListener listener) { - return newOrcaReportingHelperWrapper( - delegate, - listener, - new ExponentialBackoffPolicy.Provider(), - GrpcUtil.STOPWATCH_SUPPLIER); - } - }; - - /** - * Gets an {@code OrcaOobUtil} instance that provides actual implementation of - * {@link #newOrcaReportingHelperWrapper}. - */ - public static OrcaOobUtil getInstance() { - return DEFAULT_INSTANCE; - } + private OrcaOobUtil() {} /** * Creates a new {@link io.grpc.LoadBalancer.Helper} with provided @@ -112,12 +87,14 @@ public abstract class OrcaOobUtil { * * public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { * // listener implements the logic for WRR's usage of backend metrics. - * OrcaReportingHelperWrapper orcaWrapper = - * OrcaOobUtil.getInstance().newOrcaReportingHelperWrapper(originHelper, listener); - * orcaWrapper.setReportingConfig( - * OrcaRerportingConfig.newBuilder().setReportInterval(30, SECOND).build()); + * OrcaReportingHelper orcaHelper = + * OrcaOobUtil.newOrcaReportingHelper(originHelper); * Subchannel subchannel = - * orcaWrapper.asHelper().createSubchannel(CreateSubchannelArgs.newBuilder()...); + * orcaHelper.createSubchannel(CreateSubchannelArgs.newBuilder()...); + * OrcaOobUtil.setListener( + * subchannel, + * listener, + * OrcaRerportingConfig.newBuilder().setReportInterval(30, SECOND).build()); * ... * } * } @@ -128,8 +105,11 @@ public abstract class OrcaOobUtil { *
    *       {@code
    *       class XdsLoadBalancer extends LoadBalancer {
-   *         private final Helper originHelper;  // the original Helper
+   *         private final Helper orcaHelper;  // the original Helper
    *
+   *         public XdsLoadBalancer(LoadBalancer.Helper helper) {
+   *           this.orcaHelper = OrcaUtil.newOrcaReportingHelper(helper);
+   *         }
    *         private void createChildPolicy(
    *             Locality locality, LoadBalancerProvider childPolicyProvider) {
    *           // Each Locality has a child policy, and the parent does per-locality aggregation by
@@ -137,11 +117,18 @@ public abstract class OrcaOobUtil {
    *
    *           // Create an OrcaReportingHelperWrapper for each Locality.
    *           // listener implements the logic for locality-level backend metric aggregation.
-   *           OrcaReportingHelperWrapper orcaWrapper =
-   *               OrcaOobUtil.getInstance().newOrcaReportingHelperWrapper(originHelper, listener);
-   *           orcaWrapper.setReportingConfig(
-   *               OrcaRerportingConfig.newBuilder().setReportInterval(30, SECOND).build());
-   *           LoadBalancer childLb = childPolicyProvider.newLoadBalancer(orcaWrapper.asHelper());
+   *           LoadBalancer childLb = childPolicyProvider.newLoadBalancer(
+   *             new ForwardingLoadBalancerHelper() {
+   *               public Subchannel createSubchannel(CreateSubchannelArgs args) {
+   *                 Subchannel subchannel = super.createSubchannel(args);
+   *                 OrcaOobUtil.setListener(subchannel, listener,
+   *                 OrcaReportingConfig.newBuilder().setReportInterval(30, SECOND).build());
+   *                 return subchannel;
+   *               }
+   *               public LoadBalancer.Helper delegate() {
+   *                 return orcaHelper;
+   *               }
+   *             });
    *         }
    *       }
    *       }
@@ -151,33 +138,20 @@ public abstract class OrcaOobUtil {
    *
    * @param delegate the delegate helper that provides essentials for establishing subchannels to
    *     backends.
-   * @param listener contains the callback to be invoked when an out-of-band ORCA report is
-   *     received.
    */
-  public abstract OrcaReportingHelperWrapper newOrcaReportingHelperWrapper(
-      LoadBalancer.Helper delegate,
-      OrcaOobReportListener listener);
+  public static LoadBalancer.Helper newOrcaReportingHelper(LoadBalancer.Helper delegate) {
+    return newOrcaReportingHelper(
+        delegate,
+        new ExponentialBackoffPolicy.Provider(),
+        GrpcUtil.STOPWATCH_SUPPLIER);
+  }
 
   @VisibleForTesting
-  static OrcaReportingHelperWrapper newOrcaReportingHelperWrapper(
+  static LoadBalancer.Helper newOrcaReportingHelper(
       LoadBalancer.Helper delegate,
-      OrcaOobReportListener listener,
       BackoffPolicy.Provider backoffPolicyProvider,
       Supplier stopwatchSupplier) {
-    final OrcaReportingHelper orcaHelper =
-        new OrcaReportingHelper(delegate, listener, backoffPolicyProvider, stopwatchSupplier);
-
-    return new OrcaReportingHelperWrapper() {
-      @Override
-      public void setReportingConfig(OrcaReportingConfig config) {
-        orcaHelper.setReportingConfig(config);
-      }
-
-      @Override
-      public Helper asHelper() {
-        return orcaHelper;
-      }
-    };
+    return new OrcaReportingHelper(delegate, backoffPolicyProvider, stopwatchSupplier);
   }
 
   /**
@@ -199,61 +173,57 @@ public abstract class OrcaOobUtil {
     void onLoadReport(OrcaLoadReport report);
   }
 
+  static final Attributes.Key ORCA_REPORTING_STATE_KEY =
+      Attributes.Key.create("internal-orca-reporting-state");
+
   /**
-   * Blueprint for the wrapper that wraps a {@link io.grpc.LoadBalancer.Helper} with the capability
-   * of allowing {@link LoadBalancer}s interested in receiving out-of-band ORCA reports to update
-   * the reporting configuration such as reporting interval.
+   *  Update {@link OrcaOobReportListener} to receive Out-of-Band metrics report for the
+   *  particular subchannel connection, and set the configuration of receiving ORCA reports,
+   *  such as the interval of receiving reports.
+   *
+   * 

This method needs to be called from the SynchronizationContext returned by the wrapped + * helper's {@link Helper#getSynchronizationContext()}. + * + *

Each load balancing policy must call this method to configure the backend load reporting. + * Otherwise, it will not receive ORCA reports. + * + *

If multiple load balancing policies configure reporting with different intervals, reports + * come with the minimum of those intervals. + * + * @param subchannel the server connected by this subchannel to receive the metrics. + * + * @param listener the callback upon receiving backend metrics from the Out-Of-Band stream. + * + * @param config the configuration to be set. + * */ - public abstract static class OrcaReportingHelperWrapper { - - /** - * Sets the configuration of receiving ORCA reports, such as the interval of receiving reports. - * - *

This method needs to be called from the SynchronizationContext returned by the wrapped - * helper's {@link Helper#getSynchronizationContext()}. - * - *

Each load balancing policy must call this method to configure the backend load reporting. - * Otherwise, it will not receive ORCA reports. - * - *

If multiple load balancing policies configure reporting with different intervals, reports - * come with the minimum of those intervals. - * - * @param config the configuration to be set. - */ - public abstract void setReportingConfig(OrcaReportingConfig config); - - /** - * Returns a wrapped {@link io.grpc.LoadBalancer.Helper}. Subchannels created through it will - * retrieve ORCA load reports if the server supports it. - */ - public abstract LoadBalancer.Helper asHelper(); + public static void setListener(Subchannel subchannel, OrcaOobReportListener listener, + OrcaReportingConfig config) { + SubchannelImpl orcaSubchannel = subchannel.getAttributes().get(ORCA_REPORTING_STATE_KEY); + if (orcaSubchannel == null) { + throw new IllegalArgumentException("Subchannel does not have orca Out-Of-Band stream enabled." + + " Try to use a subchannel created by OrcaOobUtil.OrcaHelper."); + } + orcaSubchannel.orcaState.setListener(orcaSubchannel, listener, config); } /** * An {@link OrcaReportingHelper} wraps a delegated {@link LoadBalancer.Helper} with additional * functionality to manage RPCs for out-of-band ORCA reporting for each backend it establishes - * connection to. + * connection to. Subchannels created through it will retrieve ORCA load reports if the server + * supports it. */ - private static final class OrcaReportingHelper extends ForwardingLoadBalancerHelper - implements OrcaOobReportListener { - - private static final CreateSubchannelArgs.Key ORCA_REPORTING_STATE_KEY = - CreateSubchannelArgs.Key.create("internal-orca-reporting-state"); + static final class OrcaReportingHelper extends ForwardingLoadBalancerHelper { private final LoadBalancer.Helper delegate; - private final OrcaOobReportListener listener; private final SynchronizationContext syncContext; private final BackoffPolicy.Provider backoffPolicyProvider; private final Supplier stopwatchSupplier; - private final Set orcaStates = new HashSet<>(); - @Nullable private OrcaReportingConfig orcaConfig; OrcaReportingHelper( LoadBalancer.Helper delegate, - OrcaOobReportListener listener, BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier) { this.delegate = checkNotNull(delegate, "delegate"); - this.listener = checkNotNull(listener, "listener"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); syncContext = checkNotNull(delegate.getSynchronizationContext(), "syncContext"); @@ -267,42 +237,17 @@ public abstract class OrcaOobUtil { @Override public Subchannel createSubchannel(CreateSubchannelArgs args) { syncContext.throwIfNotInThisSynchronizationContext(); - OrcaReportingState orcaState = args.getOption(ORCA_REPORTING_STATE_KEY); - boolean augmented = false; - if (orcaState == null) { + Subchannel subchannel = super.createSubchannel(args); + SubchannelImpl orcaSubchannel = subchannel.getAttributes().get(ORCA_REPORTING_STATE_KEY); + OrcaReportingState orcaState; + if (orcaSubchannel == null) { // Only the first load balancing policy requesting ORCA reports instantiates an // OrcaReportingState. - orcaState = new OrcaReportingState(this, syncContext, - delegate().getScheduledExecutorService()); - args = args.toBuilder().addOption(ORCA_REPORTING_STATE_KEY, orcaState).build(); - augmented = true; - } - orcaStates.add(orcaState); - orcaState.listeners.add(this); - Subchannel subchannel = super.createSubchannel(args); - if (augmented) { - subchannel = new SubchannelImpl(subchannel, orcaState); - } - if (orcaConfig != null) { - orcaState.setReportingConfig(this, orcaConfig); - } - return subchannel; - } - - void setReportingConfig(final OrcaReportingConfig config) { - syncContext.throwIfNotInThisSynchronizationContext(); - orcaConfig = config; - for (OrcaReportingState state : orcaStates) { - state.setReportingConfig(OrcaReportingHelper.this, config); - } - } - - @Override - public void onLoadReport(OrcaLoadReport report) { - syncContext.throwIfNotInThisSynchronizationContext(); - if (orcaConfig != null) { - listener.onLoadReport(report); + orcaState = new OrcaReportingState(syncContext, delegate().getScheduledExecutorService()); + } else { + orcaState = orcaSubchannel.orcaState; } + return new SubchannelImpl(subchannel, orcaState); } /** @@ -312,11 +257,9 @@ public abstract class OrcaOobUtil { */ private final class OrcaReportingState implements SubchannelStateListener { - private final OrcaReportingHelper orcaHelper; private final SynchronizationContext syncContext; private final ScheduledExecutorService timeService; - private final List listeners = new ArrayList<>(); - private final Map configs = new HashMap<>(); + private final Map configs = new HashMap<>(); @Nullable private Subchannel subchannel; @Nullable private ChannelLogger subchannelLogger; @Nullable @@ -335,12 +278,11 @@ public abstract class OrcaOobUtil { private ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE); // True if server returned UNIMPLEMENTED. private boolean disabled; + private boolean started; OrcaReportingState( - OrcaReportingHelper orcaHelper, SynchronizationContext syncContext, ScheduledExecutorService timeService) { - this.orcaHelper = checkNotNull(orcaHelper, "orcaHelper"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.timeService = checkNotNull(timeService, "timeService"); } @@ -350,11 +292,27 @@ public abstract class OrcaOobUtil { this.subchannel = checkNotNull(subchannel, "subchannel"); this.subchannelLogger = checkNotNull(subchannel.getChannelLogger(), "subchannelLogger"); this.stateListener = checkNotNull(stateListener, "stateListener"); + started = true; } - void setReportingConfig(OrcaReportingHelper helper, OrcaReportingConfig config) { + void setListener(SubchannelImpl orcaSubchannel, OrcaOobReportListener listener, + OrcaReportingConfig config) { + syncContext.execute(new Runnable() { + @Override + public void run() { + OrcaOobReportListener oldListener = orcaSubchannel.reportListener; + if (oldListener != null) { + configs.remove(oldListener); + } + orcaSubchannel.reportListener = listener; + setReportingConfig(listener, config); + } + }); + } + + private void setReportingConfig(OrcaOobReportListener listener, OrcaReportingConfig config) { boolean reconfigured = false; - configs.put(helper, config); + configs.put(listener, config); // Real reporting interval is the minimum of intervals requested by all participating // helpers. if (overallConfig == null) { @@ -386,9 +344,6 @@ public abstract class OrcaOobUtil { // may be available on the new connection. disabled = false; } - if (Objects.equal(newState.getState(), SHUTDOWN)) { - orcaHelper.orcaStates.remove(this); - } state = newState; adjustOrcaReporting(); // Propagate subchannel state update to downstream listeners. @@ -495,7 +450,7 @@ public abstract class OrcaOobUtil { callHasResponded = true; backoffPolicy = null; subchannelLogger.log(ChannelLogLevel.DEBUG, "Received an ORCA report: {0}", response); - for (OrcaOobReportListener listener : listeners) { + for (OrcaOobReportListener listener : configs.keySet()) { listener.onLoadReport(response); } call.request(1); @@ -550,9 +505,9 @@ public abstract class OrcaOobUtil { @VisibleForTesting static final class SubchannelImpl extends ForwardingSubchannel { - private final Subchannel delegate; private final OrcaReportingHelper.OrcaReportingState orcaState; + @Nullable private OrcaOobReportListener reportListener; SubchannelImpl(Subchannel delegate, OrcaReportingHelper.OrcaReportingState orcaState) { this.delegate = checkNotNull(delegate, "delegate"); @@ -566,8 +521,17 @@ public abstract class OrcaOobUtil { @Override public void start(SubchannelStateListener listener) { - orcaState.init(this, listener); - super.start(orcaState); + if (!orcaState.started) { + orcaState.init(this, listener); + super.start(orcaState); + } else { + super.start(listener); + } + } + + @Override + public Attributes getAttributes() { + return super.getAttributes().toBuilder().set(ORCA_REPORTING_STATE_KEY, this).build(); } } diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java index 5c6e745785..770cb783b5 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java @@ -23,6 +23,7 @@ import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.SHUTDOWN; +import static org.junit.Assert.fail; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; @@ -47,6 +48,7 @@ import io.grpc.ConnectivityStateInfo; import io.grpc.Context; import io.grpc.Context.CancellationListener; import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.CreateSubchannelArgs; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Subchannel; @@ -61,9 +63,9 @@ import io.grpc.internal.BackoffPolicy; import io.grpc.internal.FakeClock; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; +import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener; import io.grpc.xds.orca.OrcaOobUtil.OrcaReportingConfig; -import io.grpc.xds.orca.OrcaOobUtil.OrcaReportingHelperWrapper; import io.grpc.xds.orca.OrcaOobUtil.SubchannelImpl; import java.net.SocketAddress; import java.text.MessageFormat; @@ -130,9 +132,10 @@ public class OrcaOobUtilTest { @Mock private BackoffPolicy backoffPolicy1; @Mock private BackoffPolicy backoffPolicy2; private FakeSubchannel[] subchannels = new FakeSubchannel[NUM_SUBCHANNELS]; - private OrcaReportingHelperWrapper orcaHelperWrapper; - private OrcaReportingHelperWrapper parentHelperWrapper; - private OrcaReportingHelperWrapper childHelperWrapper; + private LoadBalancer.Helper orcaHelper; + private LoadBalancer.Helper parentHelper; + private LoadBalancer.Helper childHelper; + private Subchannel savedParentSubchannel; private static FakeSubchannel unwrap(Subchannel s) { return (FakeSubchannel) ((SubchannelImpl) s).delegate(); @@ -201,29 +204,34 @@ public class OrcaOobUtilTest { when(backoffPolicy1.nextBackoffNanos()).thenReturn(11L, 21L); when(backoffPolicy2.nextBackoffNanos()).thenReturn(12L, 22L); - orcaHelperWrapper = - OrcaOobUtil.newOrcaReportingHelperWrapper( + orcaHelper = + OrcaOobUtil.newOrcaReportingHelper( origHelper, - mockOrcaListener0, backoffPolicyProvider, fakeClock.getStopwatchSupplier()); - parentHelperWrapper = - OrcaOobUtil.newOrcaReportingHelperWrapper( - origHelper, - mockOrcaListener1, - backoffPolicyProvider, - fakeClock.getStopwatchSupplier()); - childHelperWrapper = - OrcaOobUtil.newOrcaReportingHelperWrapper( - parentHelperWrapper.asHelper(), - mockOrcaListener2, + parentHelper = + new ForwardingLoadBalancerHelper() { + @Override + protected Helper delegate() { + return orcaHelper; + } + + @Override + public Subchannel createSubchannel(CreateSubchannelArgs args) { + Subchannel subchannel = super.createSubchannel(args); + savedParentSubchannel = subchannel; + return subchannel; + } + }; + childHelper = + OrcaOobUtil.newOrcaReportingHelper( + parentHelper, backoffPolicyProvider, fakeClock.getStopwatchSupplier()); } @Test public void singlePolicyTypicalWorkflow() { - setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); verify(origHelper, atLeast(0)).getSynchronizationContext(); verifyNoMoreInteractions(origHelper); @@ -234,8 +242,9 @@ public class OrcaOobUtilTest { String subchannelAttrValue = "eag attr " + i; Attributes attrs = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build(); - assertThat(unwrap(createSubchannel(orcaHelperWrapper.asHelper(), i, attrs))) - .isSameInstanceAs(subchannels[i]); + Subchannel created = createSubchannel(orcaHelper, i, attrs); + assertThat(unwrap(created)).isSameInstanceAs(subchannels[i]); + setOrcaReportConfig(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture()); assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]); assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY)) @@ -304,8 +313,6 @@ public class OrcaOobUtilTest { @Test public void twoLevelPoliciesTypicalWorkflow() { - setOrcaReportConfig(childHelperWrapper, SHORT_INTERVAL_CONFIG); - setOrcaReportConfig(parentHelperWrapper, SHORT_INTERVAL_CONFIG); verify(origHelper, atLeast(0)).getSynchronizationContext(); verifyNoMoreInteractions(origHelper); @@ -316,8 +323,9 @@ public class OrcaOobUtilTest { String subchannelAttrValue = "eag attr " + i; Attributes attrs = Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build(); - assertThat(unwrap(createSubchannel(childHelperWrapper.asHelper(), i, attrs))) - .isSameInstanceAs(subchannels[i]); + Subchannel created = createSubchannel(childHelper, i, attrs); + assertThat(unwrap(((SubchannelImpl) created).delegate())).isSameInstanceAs(subchannels[i]); + OrcaOobUtil.setListener(created, mockOrcaListener1, SHORT_INTERVAL_CONFIG); verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture()); assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]); assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY)) @@ -362,7 +370,6 @@ public class OrcaOobUtilTest { serverCall.responseObserver.onNext(report); assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report); verify(mockOrcaListener1, times(i + 1)).onLoadReport(eq(report)); - verify(mockOrcaListener2, times(i + 1)).onLoadReport(eq(report)); } for (int i = 0; i < NUM_SUBCHANNELS; i++) { @@ -390,8 +397,8 @@ public class OrcaOobUtilTest { @Test public void orcReportingDisabledWhenServiceNotImplemented() { - setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); - createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); + final Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); + OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); FakeSubchannel subchannel = subchannels[0]; OpenRcaServiceImp orcaServiceImp = orcaServiceImps[0]; SubchannelStateListener mockStateListener = mockStateListeners[0]; @@ -425,8 +432,8 @@ public class OrcaOobUtilTest { @Test public void orcaReportingStreamClosedAndRetried() { - setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); - createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); + final Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); + OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); FakeSubchannel subchannel = subchannels[0]; OpenRcaServiceImp orcaServiceImp = orcaServiceImps[0]; SubchannelStateListener mockStateListener = mockStateListeners[0]; @@ -491,14 +498,14 @@ public class OrcaOobUtilTest { @Test public void reportingNotStartedUntilConfigured() { - createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); + Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); verify(mockStateListeners[0]) .onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); assertThat(orcaServiceImps[0].calls).isEmpty(); assertThat(subchannels[0].logs).isEmpty(); - setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); + OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); assertThat(orcaServiceImps[0].calls).hasSize(1); assertLog(subchannels[0].logs, "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); @@ -506,10 +513,33 @@ public class OrcaOobUtilTest { .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); } + @Test + public void updateListenerThrows() { + Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); + OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); + deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); + verify(mockStateListeners[0]) + .onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); + + assertThat(orcaServiceImps[0].calls).hasSize(1); + assertLog(subchannels[0].logs, + "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); + assertThat(orcaServiceImps[0].calls.peek().request) + .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); + assertThat(unwrap(created)).isSameInstanceAs(subchannels[0]); + try { + OrcaOobUtil.setListener(subchannels[0], mockOrcaListener1, MEDIUM_INTERVAL_CONFIG); + fail("Update orca listener on non-orca subchannel should fail"); + } catch (IllegalArgumentException ex) { + assertThat(ex.getMessage()).isEqualTo("Subchannel does not have orca Out-Of-Band " + + "stream enabled. Try to use a subchannel created by OrcaOobUtil.OrcaHelper."); + } + } + @Test public void updateReportingIntervalBeforeCreatingSubchannel() { - setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); - createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); + Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); + OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); @@ -522,8 +552,8 @@ public class OrcaOobUtilTest { @Test public void updateReportingIntervalBeforeSubchannelReady() { - createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); - setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); + Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); + OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); @@ -538,8 +568,9 @@ public class OrcaOobUtilTest { public void updateReportingIntervalWhenRpcActive() { // Sets report interval before creating a Subchannel, reporting starts right after suchannel // state becomes READY. - setOrcaReportConfig(orcaHelperWrapper, MEDIUM_INTERVAL_CONFIG); - createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); + Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); + OrcaOobUtil.setListener(created, mockOrcaListener0, + MEDIUM_INTERVAL_CONFIG); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); @@ -550,7 +581,7 @@ public class OrcaOobUtilTest { .isEqualTo(buildOrcaRequestFromConfig(MEDIUM_INTERVAL_CONFIG)); // Make reporting less frequent. - setOrcaReportConfig(orcaHelperWrapper, LONG_INTERVAL_CONFIG); + OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG); assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue(); assertThat(orcaServiceImps[0].calls).hasSize(1); assertLog(subchannels[0].logs, @@ -559,12 +590,13 @@ public class OrcaOobUtilTest { .isEqualTo(buildOrcaRequestFromConfig(LONG_INTERVAL_CONFIG)); // Configuring with the same report interval again does not restart ORCA RPC. - setOrcaReportConfig(orcaHelperWrapper, LONG_INTERVAL_CONFIG); + OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG); assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse(); assertThat(subchannels[0].logs).isEmpty(); // Make reporting more frequent. - setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); + OrcaOobUtil.setListener(created, mockOrcaListener0, + SHORT_INTERVAL_CONFIG); assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue(); assertThat(orcaServiceImps[0].calls).hasSize(1); assertLog(subchannels[0].logs, @@ -575,8 +607,8 @@ public class OrcaOobUtilTest { @Test public void updateReportingIntervalWhenRpcPendingRetry() { - createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); - setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); + Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY); + OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); @@ -596,7 +628,7 @@ public class OrcaOobUtilTest { assertThat(orcaServiceImps[0].calls).isEmpty(); // Make reporting less frequent. - setOrcaReportConfig(orcaHelperWrapper, LONG_INTERVAL_CONFIG); + OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG); // Retry task will be canceled and restarts new RPC immediately. assertThat(fakeClock.getPendingTasks()).isEmpty(); assertThat(orcaServiceImps[0].calls).hasSize(1); @@ -608,7 +640,7 @@ public class OrcaOobUtilTest { @Test public void policiesReceiveSameReportIndependently() { - createSubchannel(childHelperWrapper.asHelper(), 0, Attributes.EMPTY); + Subchannel childSubchannel = createSubchannel(childHelper, 0, Attributes.EMPTY); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); // No helper sets ORCA reporting interval, so load reporting is not started. @@ -617,7 +649,7 @@ public class OrcaOobUtilTest { assertThat(subchannels[0].logs).isEmpty(); // Parent helper requests ORCA reports with a certain interval, load reporting starts. - setOrcaReportConfig(parentHelperWrapper, SHORT_INTERVAL_CONFIG); + OrcaOobUtil.setListener(savedParentSubchannel, mockOrcaListener1, SHORT_INTERVAL_CONFIG); assertThat(orcaServiceImps[0].calls).hasSize(1); assertLog(subchannels[0].logs, "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); @@ -633,7 +665,7 @@ public class OrcaOobUtilTest { verifyNoMoreInteractions(mockOrcaListener2); // Now child helper also wants to receive reports. - setOrcaReportConfig(childHelperWrapper, SHORT_INTERVAL_CONFIG); + OrcaOobUtil.setListener(childSubchannel, mockOrcaListener2, SHORT_INTERVAL_CONFIG); orcaServiceImps[0].calls.peek().responseObserver.onNext(report); assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report); // Both helper receives the same report instance. @@ -647,9 +679,9 @@ public class OrcaOobUtilTest { @Test public void reportWithMostFrequentIntervalRequested() { - setOrcaReportConfig(parentHelperWrapper, SHORT_INTERVAL_CONFIG); - setOrcaReportConfig(childHelperWrapper, LONG_INTERVAL_CONFIG); - createSubchannel(childHelperWrapper.asHelper(), 0, Attributes.EMPTY); + Subchannel created = createSubchannel(childHelper, 0, Attributes.EMPTY); + OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG); + OrcaOobUtil.setListener(created, mockOrcaListener1, SHORT_INTERVAL_CONFIG); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); assertThat(orcaServiceImps[0].calls).hasSize(1); @@ -660,14 +692,14 @@ public class OrcaOobUtilTest { assertThat(Durations.toNanos(orcaServiceImps[0].calls.peek().request.getReportInterval())) .isEqualTo(SHORT_INTERVAL_CONFIG.getReportIntervalNanos()); - // Child helper wants reporting to be more frequent than its current setting while it is still + // Parent helper wants reporting to be more frequent than its current setting while it is still // less frequent than parent helper. Nothing should happen on existing RPC. - setOrcaReportConfig(childHelperWrapper, MEDIUM_INTERVAL_CONFIG); + OrcaOobUtil.setListener(savedParentSubchannel, mockOrcaListener0, MEDIUM_INTERVAL_CONFIG); assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse(); assertThat(subchannels[0].logs).isEmpty(); // Parent helper wants reporting to be less frequent. - setOrcaReportConfig(parentHelperWrapper, MEDIUM_INTERVAL_CONFIG); + OrcaOobUtil.setListener(created, mockOrcaListener1, MEDIUM_INTERVAL_CONFIG); assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue(); assertThat(orcaServiceImps[0].calls).hasSize(1); assertLog(subchannels[0].logs, @@ -720,13 +752,10 @@ public class OrcaOobUtilTest { } private void setOrcaReportConfig( - final OrcaReportingHelperWrapper helperWrapper, final OrcaReportingConfig config) { - syncContext.execute(new Runnable() { - @Override - public void run() { - helperWrapper.setReportingConfig(config); - } - }); + final Subchannel subchannel, + final OrcaOobReportListener listener, + final OrcaReportingConfig config) { + OrcaOobUtil.setListener(subchannel, listener, config); } private static final class OpenRcaServiceImp extends OpenRcaServiceGrpc.OpenRcaServiceImplBase {