diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java b/xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java index ed344aab73..0e88c02cd1 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; -import static io.grpc.ConnectivityState.SHUTDOWN; import com.github.xds.data.orca.v3.OrcaLoadReport; import com.github.xds.service.orca.v3.OpenRcaServiceGrpc; @@ -31,6 +30,7 @@ import com.google.common.base.Objects; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.protobuf.util.Durations; +import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ChannelLogger; @@ -53,12 +53,8 @@ import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.ForwardingSubchannel; -import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -67,34 +63,13 @@ import javax.annotation.Nullable; /** * Utility class that provides method for {@link LoadBalancer} to install listeners to receive - * out-of-band backend cost metrics in the format of Open Request Cost Aggregation (ORCA). + * out-of-band backend metrics in the format of Open Request Cost Aggregation (ORCA). */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/9129") -public abstract class OrcaOobUtil { - +public final class OrcaOobUtil { private static final Logger logger = Logger.getLogger(OrcaPerRequestUtil.class.getName()); - private static final OrcaOobUtil DEFAULT_INSTANCE = - new OrcaOobUtil() { - @Override - public OrcaReportingHelperWrapper newOrcaReportingHelperWrapper( - LoadBalancer.Helper delegate, - OrcaOobReportListener listener) { - return newOrcaReportingHelperWrapper( - delegate, - listener, - new ExponentialBackoffPolicy.Provider(), - GrpcUtil.STOPWATCH_SUPPLIER); - } - }; - - /** - * Gets an {@code OrcaOobUtil} instance that provides actual implementation of - * {@link #newOrcaReportingHelperWrapper}. - */ - public static OrcaOobUtil getInstance() { - return DEFAULT_INSTANCE; - } + private OrcaOobUtil() {} /** * Creates a new {@link io.grpc.LoadBalancer.Helper} with provided @@ -112,12 +87,14 @@ public abstract class OrcaOobUtil { * * public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { * // listener implements the logic for WRR's usage of backend metrics. - * OrcaReportingHelperWrapper orcaWrapper = - * OrcaOobUtil.getInstance().newOrcaReportingHelperWrapper(originHelper, listener); - * orcaWrapper.setReportingConfig( - * OrcaRerportingConfig.newBuilder().setReportInterval(30, SECOND).build()); + * OrcaReportingHelper orcaHelper = + * OrcaOobUtil.newOrcaReportingHelper(originHelper); * Subchannel subchannel = - * orcaWrapper.asHelper().createSubchannel(CreateSubchannelArgs.newBuilder()...); + * orcaHelper.createSubchannel(CreateSubchannelArgs.newBuilder()...); + * OrcaOobUtil.setListener( + * subchannel, + * listener, + * OrcaRerportingConfig.newBuilder().setReportInterval(30, SECOND).build()); * ... * } * } @@ -128,8 +105,11 @@ public abstract class OrcaOobUtil { *
* {@code
* class XdsLoadBalancer extends LoadBalancer {
- * private final Helper originHelper; // the original Helper
+ * private final Helper orcaHelper; // the original Helper
*
+ * public XdsLoadBalancer(LoadBalancer.Helper helper) {
+ * this.orcaHelper = OrcaUtil.newOrcaReportingHelper(helper);
+ * }
* private void createChildPolicy(
* Locality locality, LoadBalancerProvider childPolicyProvider) {
* // Each Locality has a child policy, and the parent does per-locality aggregation by
@@ -137,11 +117,18 @@ public abstract class OrcaOobUtil {
*
* // Create an OrcaReportingHelperWrapper for each Locality.
* // listener implements the logic for locality-level backend metric aggregation.
- * OrcaReportingHelperWrapper orcaWrapper =
- * OrcaOobUtil.getInstance().newOrcaReportingHelperWrapper(originHelper, listener);
- * orcaWrapper.setReportingConfig(
- * OrcaRerportingConfig.newBuilder().setReportInterval(30, SECOND).build());
- * LoadBalancer childLb = childPolicyProvider.newLoadBalancer(orcaWrapper.asHelper());
+ * LoadBalancer childLb = childPolicyProvider.newLoadBalancer(
+ * new ForwardingLoadBalancerHelper() {
+ * public Subchannel createSubchannel(CreateSubchannelArgs args) {
+ * Subchannel subchannel = super.createSubchannel(args);
+ * OrcaOobUtil.setListener(subchannel, listener,
+ * OrcaReportingConfig.newBuilder().setReportInterval(30, SECOND).build());
+ * return subchannel;
+ * }
+ * public LoadBalancer.Helper delegate() {
+ * return orcaHelper;
+ * }
+ * });
* }
* }
* }
@@ -151,33 +138,20 @@ public abstract class OrcaOobUtil {
*
* @param delegate the delegate helper that provides essentials for establishing subchannels to
* backends.
- * @param listener contains the callback to be invoked when an out-of-band ORCA report is
- * received.
*/
- public abstract OrcaReportingHelperWrapper newOrcaReportingHelperWrapper(
- LoadBalancer.Helper delegate,
- OrcaOobReportListener listener);
+ public static LoadBalancer.Helper newOrcaReportingHelper(LoadBalancer.Helper delegate) {
+ return newOrcaReportingHelper(
+ delegate,
+ new ExponentialBackoffPolicy.Provider(),
+ GrpcUtil.STOPWATCH_SUPPLIER);
+ }
@VisibleForTesting
- static OrcaReportingHelperWrapper newOrcaReportingHelperWrapper(
+ static LoadBalancer.Helper newOrcaReportingHelper(
LoadBalancer.Helper delegate,
- OrcaOobReportListener listener,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier stopwatchSupplier) {
- final OrcaReportingHelper orcaHelper =
- new OrcaReportingHelper(delegate, listener, backoffPolicyProvider, stopwatchSupplier);
-
- return new OrcaReportingHelperWrapper() {
- @Override
- public void setReportingConfig(OrcaReportingConfig config) {
- orcaHelper.setReportingConfig(config);
- }
-
- @Override
- public Helper asHelper() {
- return orcaHelper;
- }
- };
+ return new OrcaReportingHelper(delegate, backoffPolicyProvider, stopwatchSupplier);
}
/**
@@ -199,61 +173,57 @@ public abstract class OrcaOobUtil {
void onLoadReport(OrcaLoadReport report);
}
+ static final Attributes.Key ORCA_REPORTING_STATE_KEY =
+ Attributes.Key.create("internal-orca-reporting-state");
+
/**
- * Blueprint for the wrapper that wraps a {@link io.grpc.LoadBalancer.Helper} with the capability
- * of allowing {@link LoadBalancer}s interested in receiving out-of-band ORCA reports to update
- * the reporting configuration such as reporting interval.
+ * Update {@link OrcaOobReportListener} to receive Out-of-Band metrics report for the
+ * particular subchannel connection, and set the configuration of receiving ORCA reports,
+ * such as the interval of receiving reports.
+ *
+ * This method needs to be called from the SynchronizationContext returned by the wrapped
+ * helper's {@link Helper#getSynchronizationContext()}.
+ *
+ *
Each load balancing policy must call this method to configure the backend load reporting.
+ * Otherwise, it will not receive ORCA reports.
+ *
+ *
If multiple load balancing policies configure reporting with different intervals, reports
+ * come with the minimum of those intervals.
+ *
+ * @param subchannel the server connected by this subchannel to receive the metrics.
+ *
+ * @param listener the callback upon receiving backend metrics from the Out-Of-Band stream.
+ *
+ * @param config the configuration to be set.
+ *
*/
- public abstract static class OrcaReportingHelperWrapper {
-
- /**
- * Sets the configuration of receiving ORCA reports, such as the interval of receiving reports.
- *
- *
This method needs to be called from the SynchronizationContext returned by the wrapped
- * helper's {@link Helper#getSynchronizationContext()}.
- *
- *
Each load balancing policy must call this method to configure the backend load reporting.
- * Otherwise, it will not receive ORCA reports.
- *
- *
If multiple load balancing policies configure reporting with different intervals, reports
- * come with the minimum of those intervals.
- *
- * @param config the configuration to be set.
- */
- public abstract void setReportingConfig(OrcaReportingConfig config);
-
- /**
- * Returns a wrapped {@link io.grpc.LoadBalancer.Helper}. Subchannels created through it will
- * retrieve ORCA load reports if the server supports it.
- */
- public abstract LoadBalancer.Helper asHelper();
+ public static void setListener(Subchannel subchannel, OrcaOobReportListener listener,
+ OrcaReportingConfig config) {
+ SubchannelImpl orcaSubchannel = subchannel.getAttributes().get(ORCA_REPORTING_STATE_KEY);
+ if (orcaSubchannel == null) {
+ throw new IllegalArgumentException("Subchannel does not have orca Out-Of-Band stream enabled."
+ + " Try to use a subchannel created by OrcaOobUtil.OrcaHelper.");
+ }
+ orcaSubchannel.orcaState.setListener(orcaSubchannel, listener, config);
}
/**
* An {@link OrcaReportingHelper} wraps a delegated {@link LoadBalancer.Helper} with additional
* functionality to manage RPCs for out-of-band ORCA reporting for each backend it establishes
- * connection to.
+ * connection to. Subchannels created through it will retrieve ORCA load reports if the server
+ * supports it.
*/
- private static final class OrcaReportingHelper extends ForwardingLoadBalancerHelper
- implements OrcaOobReportListener {
-
- private static final CreateSubchannelArgs.Key ORCA_REPORTING_STATE_KEY =
- CreateSubchannelArgs.Key.create("internal-orca-reporting-state");
+ static final class OrcaReportingHelper extends ForwardingLoadBalancerHelper {
private final LoadBalancer.Helper delegate;
- private final OrcaOobReportListener listener;
private final SynchronizationContext syncContext;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Supplier stopwatchSupplier;
- private final Set orcaStates = new HashSet<>();
- @Nullable private OrcaReportingConfig orcaConfig;
OrcaReportingHelper(
LoadBalancer.Helper delegate,
- OrcaOobReportListener listener,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier stopwatchSupplier) {
this.delegate = checkNotNull(delegate, "delegate");
- this.listener = checkNotNull(listener, "listener");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
syncContext = checkNotNull(delegate.getSynchronizationContext(), "syncContext");
@@ -267,42 +237,17 @@ public abstract class OrcaOobUtil {
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
syncContext.throwIfNotInThisSynchronizationContext();
- OrcaReportingState orcaState = args.getOption(ORCA_REPORTING_STATE_KEY);
- boolean augmented = false;
- if (orcaState == null) {
+ Subchannel subchannel = super.createSubchannel(args);
+ SubchannelImpl orcaSubchannel = subchannel.getAttributes().get(ORCA_REPORTING_STATE_KEY);
+ OrcaReportingState orcaState;
+ if (orcaSubchannel == null) {
// Only the first load balancing policy requesting ORCA reports instantiates an
// OrcaReportingState.
- orcaState = new OrcaReportingState(this, syncContext,
- delegate().getScheduledExecutorService());
- args = args.toBuilder().addOption(ORCA_REPORTING_STATE_KEY, orcaState).build();
- augmented = true;
- }
- orcaStates.add(orcaState);
- orcaState.listeners.add(this);
- Subchannel subchannel = super.createSubchannel(args);
- if (augmented) {
- subchannel = new SubchannelImpl(subchannel, orcaState);
- }
- if (orcaConfig != null) {
- orcaState.setReportingConfig(this, orcaConfig);
- }
- return subchannel;
- }
-
- void setReportingConfig(final OrcaReportingConfig config) {
- syncContext.throwIfNotInThisSynchronizationContext();
- orcaConfig = config;
- for (OrcaReportingState state : orcaStates) {
- state.setReportingConfig(OrcaReportingHelper.this, config);
- }
- }
-
- @Override
- public void onLoadReport(OrcaLoadReport report) {
- syncContext.throwIfNotInThisSynchronizationContext();
- if (orcaConfig != null) {
- listener.onLoadReport(report);
+ orcaState = new OrcaReportingState(syncContext, delegate().getScheduledExecutorService());
+ } else {
+ orcaState = orcaSubchannel.orcaState;
}
+ return new SubchannelImpl(subchannel, orcaState);
}
/**
@@ -312,11 +257,9 @@ public abstract class OrcaOobUtil {
*/
private final class OrcaReportingState implements SubchannelStateListener {
- private final OrcaReportingHelper orcaHelper;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
- private final List listeners = new ArrayList<>();
- private final Map configs = new HashMap<>();
+ private final Map configs = new HashMap<>();
@Nullable private Subchannel subchannel;
@Nullable private ChannelLogger subchannelLogger;
@Nullable
@@ -335,12 +278,11 @@ public abstract class OrcaOobUtil {
private ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE);
// True if server returned UNIMPLEMENTED.
private boolean disabled;
+ private boolean started;
OrcaReportingState(
- OrcaReportingHelper orcaHelper,
SynchronizationContext syncContext,
ScheduledExecutorService timeService) {
- this.orcaHelper = checkNotNull(orcaHelper, "orcaHelper");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timeService = checkNotNull(timeService, "timeService");
}
@@ -350,11 +292,27 @@ public abstract class OrcaOobUtil {
this.subchannel = checkNotNull(subchannel, "subchannel");
this.subchannelLogger = checkNotNull(subchannel.getChannelLogger(), "subchannelLogger");
this.stateListener = checkNotNull(stateListener, "stateListener");
+ started = true;
}
- void setReportingConfig(OrcaReportingHelper helper, OrcaReportingConfig config) {
+ void setListener(SubchannelImpl orcaSubchannel, OrcaOobReportListener listener,
+ OrcaReportingConfig config) {
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ OrcaOobReportListener oldListener = orcaSubchannel.reportListener;
+ if (oldListener != null) {
+ configs.remove(oldListener);
+ }
+ orcaSubchannel.reportListener = listener;
+ setReportingConfig(listener, config);
+ }
+ });
+ }
+
+ private void setReportingConfig(OrcaOobReportListener listener, OrcaReportingConfig config) {
boolean reconfigured = false;
- configs.put(helper, config);
+ configs.put(listener, config);
// Real reporting interval is the minimum of intervals requested by all participating
// helpers.
if (overallConfig == null) {
@@ -386,9 +344,6 @@ public abstract class OrcaOobUtil {
// may be available on the new connection.
disabled = false;
}
- if (Objects.equal(newState.getState(), SHUTDOWN)) {
- orcaHelper.orcaStates.remove(this);
- }
state = newState;
adjustOrcaReporting();
// Propagate subchannel state update to downstream listeners.
@@ -495,7 +450,7 @@ public abstract class OrcaOobUtil {
callHasResponded = true;
backoffPolicy = null;
subchannelLogger.log(ChannelLogLevel.DEBUG, "Received an ORCA report: {0}", response);
- for (OrcaOobReportListener listener : listeners) {
+ for (OrcaOobReportListener listener : configs.keySet()) {
listener.onLoadReport(response);
}
call.request(1);
@@ -550,9 +505,9 @@ public abstract class OrcaOobUtil {
@VisibleForTesting
static final class SubchannelImpl extends ForwardingSubchannel {
-
private final Subchannel delegate;
private final OrcaReportingHelper.OrcaReportingState orcaState;
+ @Nullable private OrcaOobReportListener reportListener;
SubchannelImpl(Subchannel delegate, OrcaReportingHelper.OrcaReportingState orcaState) {
this.delegate = checkNotNull(delegate, "delegate");
@@ -566,8 +521,17 @@ public abstract class OrcaOobUtil {
@Override
public void start(SubchannelStateListener listener) {
- orcaState.init(this, listener);
- super.start(orcaState);
+ if (!orcaState.started) {
+ orcaState.init(this, listener);
+ super.start(orcaState);
+ } else {
+ super.start(listener);
+ }
+ }
+
+ @Override
+ public Attributes getAttributes() {
+ return super.getAttributes().toBuilder().set(ORCA_REPORTING_STATE_KEY, this).build();
}
}
diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java
index 5c6e745785..770cb783b5 100644
--- a/xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java
+++ b/xds/src/test/java/io/grpc/xds/orca/OrcaOobUtilTest.java
@@ -23,6 +23,7 @@ import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
+import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
@@ -47,6 +48,7 @@ import io.grpc.ConnectivityStateInfo;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.EquivalentAddressGroup;
+import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
@@ -61,9 +63,9 @@ import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
import io.grpc.xds.orca.OrcaOobUtil.OrcaReportingConfig;
-import io.grpc.xds.orca.OrcaOobUtil.OrcaReportingHelperWrapper;
import io.grpc.xds.orca.OrcaOobUtil.SubchannelImpl;
import java.net.SocketAddress;
import java.text.MessageFormat;
@@ -130,9 +132,10 @@ public class OrcaOobUtilTest {
@Mock private BackoffPolicy backoffPolicy1;
@Mock private BackoffPolicy backoffPolicy2;
private FakeSubchannel[] subchannels = new FakeSubchannel[NUM_SUBCHANNELS];
- private OrcaReportingHelperWrapper orcaHelperWrapper;
- private OrcaReportingHelperWrapper parentHelperWrapper;
- private OrcaReportingHelperWrapper childHelperWrapper;
+ private LoadBalancer.Helper orcaHelper;
+ private LoadBalancer.Helper parentHelper;
+ private LoadBalancer.Helper childHelper;
+ private Subchannel savedParentSubchannel;
private static FakeSubchannel unwrap(Subchannel s) {
return (FakeSubchannel) ((SubchannelImpl) s).delegate();
@@ -201,29 +204,34 @@ public class OrcaOobUtilTest {
when(backoffPolicy1.nextBackoffNanos()).thenReturn(11L, 21L);
when(backoffPolicy2.nextBackoffNanos()).thenReturn(12L, 22L);
- orcaHelperWrapper =
- OrcaOobUtil.newOrcaReportingHelperWrapper(
+ orcaHelper =
+ OrcaOobUtil.newOrcaReportingHelper(
origHelper,
- mockOrcaListener0,
backoffPolicyProvider,
fakeClock.getStopwatchSupplier());
- parentHelperWrapper =
- OrcaOobUtil.newOrcaReportingHelperWrapper(
- origHelper,
- mockOrcaListener1,
- backoffPolicyProvider,
- fakeClock.getStopwatchSupplier());
- childHelperWrapper =
- OrcaOobUtil.newOrcaReportingHelperWrapper(
- parentHelperWrapper.asHelper(),
- mockOrcaListener2,
+ parentHelper =
+ new ForwardingLoadBalancerHelper() {
+ @Override
+ protected Helper delegate() {
+ return orcaHelper;
+ }
+
+ @Override
+ public Subchannel createSubchannel(CreateSubchannelArgs args) {
+ Subchannel subchannel = super.createSubchannel(args);
+ savedParentSubchannel = subchannel;
+ return subchannel;
+ }
+ };
+ childHelper =
+ OrcaOobUtil.newOrcaReportingHelper(
+ parentHelper,
backoffPolicyProvider,
fakeClock.getStopwatchSupplier());
}
@Test
public void singlePolicyTypicalWorkflow() {
- setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG);
verify(origHelper, atLeast(0)).getSynchronizationContext();
verifyNoMoreInteractions(origHelper);
@@ -234,8 +242,9 @@ public class OrcaOobUtilTest {
String subchannelAttrValue = "eag attr " + i;
Attributes attrs =
Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build();
- assertThat(unwrap(createSubchannel(orcaHelperWrapper.asHelper(), i, attrs)))
- .isSameInstanceAs(subchannels[i]);
+ Subchannel created = createSubchannel(orcaHelper, i, attrs);
+ assertThat(unwrap(created)).isSameInstanceAs(subchannels[i]);
+ setOrcaReportConfig(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture());
assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]);
assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY))
@@ -304,8 +313,6 @@ public class OrcaOobUtilTest {
@Test
public void twoLevelPoliciesTypicalWorkflow() {
- setOrcaReportConfig(childHelperWrapper, SHORT_INTERVAL_CONFIG);
- setOrcaReportConfig(parentHelperWrapper, SHORT_INTERVAL_CONFIG);
verify(origHelper, atLeast(0)).getSynchronizationContext();
verifyNoMoreInteractions(origHelper);
@@ -316,8 +323,9 @@ public class OrcaOobUtilTest {
String subchannelAttrValue = "eag attr " + i;
Attributes attrs =
Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build();
- assertThat(unwrap(createSubchannel(childHelperWrapper.asHelper(), i, attrs)))
- .isSameInstanceAs(subchannels[i]);
+ Subchannel created = createSubchannel(childHelper, i, attrs);
+ assertThat(unwrap(((SubchannelImpl) created).delegate())).isSameInstanceAs(subchannels[i]);
+ OrcaOobUtil.setListener(created, mockOrcaListener1, SHORT_INTERVAL_CONFIG);
verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture());
assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]);
assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY))
@@ -362,7 +370,6 @@ public class OrcaOobUtilTest {
serverCall.responseObserver.onNext(report);
assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
verify(mockOrcaListener1, times(i + 1)).onLoadReport(eq(report));
- verify(mockOrcaListener2, times(i + 1)).onLoadReport(eq(report));
}
for (int i = 0; i < NUM_SUBCHANNELS; i++) {
@@ -390,8 +397,8 @@ public class OrcaOobUtilTest {
@Test
public void orcReportingDisabledWhenServiceNotImplemented() {
- setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG);
- createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY);
+ final Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
+ OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
FakeSubchannel subchannel = subchannels[0];
OpenRcaServiceImp orcaServiceImp = orcaServiceImps[0];
SubchannelStateListener mockStateListener = mockStateListeners[0];
@@ -425,8 +432,8 @@ public class OrcaOobUtilTest {
@Test
public void orcaReportingStreamClosedAndRetried() {
- setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG);
- createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY);
+ final Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
+ OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
FakeSubchannel subchannel = subchannels[0];
OpenRcaServiceImp orcaServiceImp = orcaServiceImps[0];
SubchannelStateListener mockStateListener = mockStateListeners[0];
@@ -491,14 +498,14 @@ public class OrcaOobUtilTest {
@Test
public void reportingNotStartedUntilConfigured() {
- createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY);
+ Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0])
.onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
assertThat(orcaServiceImps[0].calls).isEmpty();
assertThat(subchannels[0].logs).isEmpty();
- setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG);
+ OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
"DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
@@ -506,10 +513,33 @@ public class OrcaOobUtilTest {
.isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
}
+ @Test
+ public void updateListenerThrows() {
+ Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
+ OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
+ deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
+ verify(mockStateListeners[0])
+ .onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
+
+ assertThat(orcaServiceImps[0].calls).hasSize(1);
+ assertLog(subchannels[0].logs,
+ "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
+ assertThat(orcaServiceImps[0].calls.peek().request)
+ .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
+ assertThat(unwrap(created)).isSameInstanceAs(subchannels[0]);
+ try {
+ OrcaOobUtil.setListener(subchannels[0], mockOrcaListener1, MEDIUM_INTERVAL_CONFIG);
+ fail("Update orca listener on non-orca subchannel should fail");
+ } catch (IllegalArgumentException ex) {
+ assertThat(ex.getMessage()).isEqualTo("Subchannel does not have orca Out-Of-Band "
+ + "stream enabled. Try to use a subchannel created by OrcaOobUtil.OrcaHelper.");
+ }
+ }
+
@Test
public void updateReportingIntervalBeforeCreatingSubchannel() {
- setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG);
- createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY);
+ Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
+ OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
@@ -522,8 +552,8 @@ public class OrcaOobUtilTest {
@Test
public void updateReportingIntervalBeforeSubchannelReady() {
- createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY);
- setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG);
+ Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
+ OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
@@ -538,8 +568,9 @@ public class OrcaOobUtilTest {
public void updateReportingIntervalWhenRpcActive() {
// Sets report interval before creating a Subchannel, reporting starts right after suchannel
// state becomes READY.
- setOrcaReportConfig(orcaHelperWrapper, MEDIUM_INTERVAL_CONFIG);
- createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY);
+ Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
+ OrcaOobUtil.setListener(created, mockOrcaListener0,
+ MEDIUM_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
@@ -550,7 +581,7 @@ public class OrcaOobUtilTest {
.isEqualTo(buildOrcaRequestFromConfig(MEDIUM_INTERVAL_CONFIG));
// Make reporting less frequent.
- setOrcaReportConfig(orcaHelperWrapper, LONG_INTERVAL_CONFIG);
+ OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue();
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
@@ -559,12 +590,13 @@ public class OrcaOobUtilTest {
.isEqualTo(buildOrcaRequestFromConfig(LONG_INTERVAL_CONFIG));
// Configuring with the same report interval again does not restart ORCA RPC.
- setOrcaReportConfig(orcaHelperWrapper, LONG_INTERVAL_CONFIG);
+ OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse();
assertThat(subchannels[0].logs).isEmpty();
// Make reporting more frequent.
- setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG);
+ OrcaOobUtil.setListener(created, mockOrcaListener0,
+ SHORT_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue();
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
@@ -575,8 +607,8 @@ public class OrcaOobUtilTest {
@Test
public void updateReportingIntervalWhenRpcPendingRetry() {
- createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY);
- setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG);
+ Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
+ OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
@@ -596,7 +628,7 @@ public class OrcaOobUtilTest {
assertThat(orcaServiceImps[0].calls).isEmpty();
// Make reporting less frequent.
- setOrcaReportConfig(orcaHelperWrapper, LONG_INTERVAL_CONFIG);
+ OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG);
// Retry task will be canceled and restarts new RPC immediately.
assertThat(fakeClock.getPendingTasks()).isEmpty();
assertThat(orcaServiceImps[0].calls).hasSize(1);
@@ -608,7 +640,7 @@ public class OrcaOobUtilTest {
@Test
public void policiesReceiveSameReportIndependently() {
- createSubchannel(childHelperWrapper.asHelper(), 0, Attributes.EMPTY);
+ Subchannel childSubchannel = createSubchannel(childHelper, 0, Attributes.EMPTY);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
// No helper sets ORCA reporting interval, so load reporting is not started.
@@ -617,7 +649,7 @@ public class OrcaOobUtilTest {
assertThat(subchannels[0].logs).isEmpty();
// Parent helper requests ORCA reports with a certain interval, load reporting starts.
- setOrcaReportConfig(parentHelperWrapper, SHORT_INTERVAL_CONFIG);
+ OrcaOobUtil.setListener(savedParentSubchannel, mockOrcaListener1, SHORT_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
"DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
@@ -633,7 +665,7 @@ public class OrcaOobUtilTest {
verifyNoMoreInteractions(mockOrcaListener2);
// Now child helper also wants to receive reports.
- setOrcaReportConfig(childHelperWrapper, SHORT_INTERVAL_CONFIG);
+ OrcaOobUtil.setListener(childSubchannel, mockOrcaListener2, SHORT_INTERVAL_CONFIG);
orcaServiceImps[0].calls.peek().responseObserver.onNext(report);
assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report);
// Both helper receives the same report instance.
@@ -647,9 +679,9 @@ public class OrcaOobUtilTest {
@Test
public void reportWithMostFrequentIntervalRequested() {
- setOrcaReportConfig(parentHelperWrapper, SHORT_INTERVAL_CONFIG);
- setOrcaReportConfig(childHelperWrapper, LONG_INTERVAL_CONFIG);
- createSubchannel(childHelperWrapper.asHelper(), 0, Attributes.EMPTY);
+ Subchannel created = createSubchannel(childHelper, 0, Attributes.EMPTY);
+ OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG);
+ OrcaOobUtil.setListener(created, mockOrcaListener1, SHORT_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
assertThat(orcaServiceImps[0].calls).hasSize(1);
@@ -660,14 +692,14 @@ public class OrcaOobUtilTest {
assertThat(Durations.toNanos(orcaServiceImps[0].calls.peek().request.getReportInterval()))
.isEqualTo(SHORT_INTERVAL_CONFIG.getReportIntervalNanos());
- // Child helper wants reporting to be more frequent than its current setting while it is still
+ // Parent helper wants reporting to be more frequent than its current setting while it is still
// less frequent than parent helper. Nothing should happen on existing RPC.
- setOrcaReportConfig(childHelperWrapper, MEDIUM_INTERVAL_CONFIG);
+ OrcaOobUtil.setListener(savedParentSubchannel, mockOrcaListener0, MEDIUM_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse();
assertThat(subchannels[0].logs).isEmpty();
// Parent helper wants reporting to be less frequent.
- setOrcaReportConfig(parentHelperWrapper, MEDIUM_INTERVAL_CONFIG);
+ OrcaOobUtil.setListener(created, mockOrcaListener1, MEDIUM_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue();
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
@@ -720,13 +752,10 @@ public class OrcaOobUtilTest {
}
private void setOrcaReportConfig(
- final OrcaReportingHelperWrapper helperWrapper, final OrcaReportingConfig config) {
- syncContext.execute(new Runnable() {
- @Override
- public void run() {
- helperWrapper.setReportingConfig(config);
- }
- });
+ final Subchannel subchannel,
+ final OrcaOobReportListener listener,
+ final OrcaReportingConfig config) {
+ OrcaOobUtil.setListener(subchannel, listener, config);
}
private static final class OpenRcaServiceImp extends OpenRcaServiceGrpc.OpenRcaServiceImplBase {